]> granicus.if.org Git - icinga2/blob - lib/remote/apilistener.cpp
e4695d1355f9d4c52b5d9147b3dc77d04bc31604
[icinga2] / lib / remote / apilistener.cpp
1 /******************************************************************************
2  * Icinga 2                                                                   *
3  * Copyright (C) 2012-2018 Icinga Development Team (https://www.icinga.com/)  *
4  *                                                                            *
5  * This program is free software; you can redistribute it and/or              *
6  * modify it under the terms of the GNU General Public License                *
7  * as published by the Free Software Foundation; either version 2             *
8  * of the License, or (at your option) any later version.                     *
9  *                                                                            *
10  * This program is distributed in the hope that it will be useful,            *
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
13  * GNU General Public License for more details.                               *
14  *                                                                            *
15  * You should have received a copy of the GNU General Public License          *
16  * along with this program; if not, write to the Free Software Foundation     *
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
18  ******************************************************************************/
19
20 #include "remote/apilistener.hpp"
21 #include "remote/apilistener.tcpp"
22 #include "remote/jsonrpcconnection.hpp"
23 #include "remote/endpoint.hpp"
24 #include "remote/jsonrpc.hpp"
25 #include "remote/apifunction.hpp"
26 #include "base/convert.hpp"
27 #include "base/netstring.hpp"
28 #include "base/json.hpp"
29 #include "base/configtype.hpp"
30 #include "base/logger.hpp"
31 #include "base/objectlock.hpp"
32 #include "base/stdiostream.hpp"
33 #include "base/perfdatavalue.hpp"
34 #include "base/application.hpp"
35 #include "base/context.hpp"
36 #include "base/statsfunction.hpp"
37 #include "base/exception.hpp"
38 #include <fstream>
39
40 using namespace icinga;
41
42 REGISTER_TYPE(ApiListener);
43
44 boost::signals2::signal<void(bool)> ApiListener::OnMasterChanged;
45 ApiListener::Ptr ApiListener::m_Instance;
46
47 REGISTER_STATSFUNCTION(ApiListener, &ApiListener::StatsFunc);
48
49 REGISTER_APIFUNCTION(Hello, icinga, &ApiListener::HelloAPIHandler);
50
51 ApiListener::ApiListener(void)
52         : m_SyncQueue(0, 4), m_LogMessageCount(0)
53 {
54         m_RelayQueue.SetName("ApiListener, RelayQueue");
55         m_SyncQueue.SetName("ApiListener, SyncQueue");
56 }
57
58 String ApiListener::GetApiDir(void)
59 {
60         return Application::GetLocalStateDir() + "/lib/icinga2/api/";
61 }
62
63 String ApiListener::GetCertsDir(void)
64 {
65         return Application::GetLocalStateDir() + "/lib/icinga2/certs/";
66 }
67
68 String ApiListener::GetCaDir(void)
69 {
70         return Application::GetLocalStateDir() + "/lib/icinga2/ca/";
71 }
72
73 String ApiListener::GetCertificateRequestsDir(void)
74 {
75         return Application::GetLocalStateDir() + "/lib/icinga2/certificate-requests/";
76 }
77
78 String ApiListener::GetDefaultCertPath(void)
79 {
80         return GetCertsDir() + "/" + ScriptGlobal::Get("NodeName") + ".crt";
81 }
82
83 String ApiListener::GetDefaultKeyPath(void)
84 {
85         return GetCertsDir() + "/" + ScriptGlobal::Get("NodeName") + ".key";
86 }
87
88 String ApiListener::GetDefaultCaPath(void)
89 {
90         return GetCertsDir() + "/ca.crt";
91 }
92
93 void ApiListener::CopyCertificateFile(const String& oldCertPath, const String& newCertPath)
94 {
95         struct stat st1, st2;
96
97         if (!oldCertPath.IsEmpty() && stat(oldCertPath.CStr(), &st1) >= 0 && (stat(newCertPath.CStr(), &st2) < 0 || st1.st_mtime > st2.st_mtime)) {
98                 Log(LogWarning, "ApiListener")
99                         << "Copying '" << oldCertPath << "' certificate file to '" << newCertPath << "'";
100
101                 Utility::MkDirP(Utility::DirName(newCertPath), 0700);
102                 Utility::CopyFile(oldCertPath, newCertPath);
103         }
104 }
105
106 void ApiListener::OnConfigLoaded(void)
107 {
108         if (m_Instance)
109                 BOOST_THROW_EXCEPTION(ScriptError("Only one ApiListener object is allowed.", GetDebugInfo()));
110
111         m_Instance = this;
112
113         String defaultCertPath = GetDefaultCertPath();
114         String defaultKeyPath = GetDefaultKeyPath();
115         String defaultCaPath = GetDefaultCaPath();
116
117         /* Migrate certificate location < 2.8 to the new default path. */
118         String oldCertPath = GetCertPath();
119         String oldKeyPath = GetKeyPath();
120         String oldCaPath = GetCaPath();
121
122         CopyCertificateFile(oldCertPath, defaultCertPath);
123         CopyCertificateFile(oldKeyPath, defaultKeyPath);
124         CopyCertificateFile(oldCaPath, defaultCaPath);
125
126         if (!oldCertPath.IsEmpty() && !oldKeyPath.IsEmpty() && !oldCaPath.IsEmpty()) {
127                 Log(LogWarning, "ApiListener", "Please read the upgrading documentation for v2.8: https://www.icinga.com/docs/icinga2/latest/doc/16-upgrading-icinga-2/");
128         }
129
130         /* set up SSL context */
131         std::shared_ptr<X509> cert;
132         try {
133                 cert = GetX509Certificate(defaultCertPath);
134         } catch (const std::exception&) {
135                 BOOST_THROW_EXCEPTION(ScriptError("Cannot get certificate from cert path: '"
136                         + defaultCertPath + "'.", GetDebugInfo()));
137         }
138
139         try {
140                 SetIdentity(GetCertificateCN(cert));
141         } catch (const std::exception&) {
142                 BOOST_THROW_EXCEPTION(ScriptError("Cannot get certificate common name from cert path: '"
143                         + defaultCertPath + "'.", GetDebugInfo()));
144         }
145
146         Log(LogInformation, "ApiListener")
147                 << "My API identity: " << GetIdentity();
148
149         UpdateSSLContext();
150 }
151
152 void ApiListener::UpdateSSLContext(void)
153 {
154         std::shared_ptr<SSL_CTX> context;
155
156         try {
157                 context = MakeSSLContext(GetDefaultCertPath(), GetDefaultKeyPath(), GetDefaultCaPath());
158         } catch (const std::exception&) {
159                 BOOST_THROW_EXCEPTION(ScriptError("Cannot make SSL context for cert path: '"
160                         + GetDefaultCertPath() + "' key path: '" + GetDefaultKeyPath() + "' ca path: '" + GetDefaultCaPath() + "'.", GetDebugInfo()));
161         }
162
163         if (!GetCrlPath().IsEmpty()) {
164                 try {
165                         AddCRLToSSLContext(context, GetCrlPath());
166                 } catch (const std::exception&) {
167                         BOOST_THROW_EXCEPTION(ScriptError("Cannot add certificate revocation list to SSL context for crl path: '"
168                                 + GetCrlPath() + "'.", GetDebugInfo()));
169                 }
170         }
171
172         if (!GetCipherList().IsEmpty()) {
173                 try {
174                         SetCipherListToSSLContext(context, GetCipherList());
175                 } catch (const std::exception&) {
176                         BOOST_THROW_EXCEPTION(ScriptError("Cannot set cipher list to SSL context for cipher list: '"
177                                 + GetCipherList() + "'.", GetDebugInfo()));
178                 }
179         }
180
181         if (!GetTlsProtocolmin().IsEmpty()){
182                 try {
183                         SetTlsProtocolminToSSLContext(context, GetTlsProtocolmin());
184                 } catch (const std::exception&) {
185                         BOOST_THROW_EXCEPTION(ScriptError("Cannot set minimum TLS protocol version to SSL context with tls_protocolmin: '" + GetTlsProtocolmin() + "'.", GetDebugInfo()));
186                 }
187         }
188
189         m_SSLContext = context;
190
191         for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) {
192                 for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
193                         client->Disconnect();
194                 }
195         }
196
197         for (const JsonRpcConnection::Ptr& client : m_AnonymousClients) {
198                 client->Disconnect();
199         }
200 }
201
202 void ApiListener::OnAllConfigLoaded(void)
203 {
204         m_LocalEndpoint = Endpoint::GetByName(GetIdentity());
205
206         if (!m_LocalEndpoint)
207                 BOOST_THROW_EXCEPTION(ScriptError("Endpoint object for '" + GetIdentity() + "' is missing.", GetDebugInfo()));
208 }
209
210 /**
211  * Starts the component.
212  */
213 void ApiListener::Start(bool runtimeCreated)
214 {
215         Log(LogInformation, "ApiListener")
216                 << "'" << GetName() << "' started.";
217
218         SyncZoneDirs();
219
220         ObjectImpl<ApiListener>::Start(runtimeCreated);
221
222         {
223                 boost::mutex::scoped_lock(m_LogLock);
224                 RotateLogFile();
225                 OpenLogFile();
226         }
227
228         /* create the primary JSON-RPC listener */
229         if (!AddListener(GetBindHost(), GetBindPort())) {
230                 Log(LogCritical, "ApiListener")
231                         << "Cannot add listener on host '" << GetBindHost() << "' for port '" << GetBindPort() << "'.";
232                 Application::Exit(EXIT_FAILURE);
233         }
234
235         m_Timer = new Timer();
236         m_Timer->OnTimerExpired.connect(std::bind(&ApiListener::ApiTimerHandler, this));
237         m_Timer->SetInterval(5);
238         m_Timer->Start();
239         m_Timer->Reschedule(0);
240
241         m_ReconnectTimer = new Timer();
242         m_ReconnectTimer->OnTimerExpired.connect(std::bind(&ApiListener::ApiReconnectTimerHandler, this));
243         m_ReconnectTimer->SetInterval(60);
244         m_ReconnectTimer->Start();
245         m_ReconnectTimer->Reschedule(0);
246
247         m_AuthorityTimer = new Timer();
248         m_AuthorityTimer->OnTimerExpired.connect(std::bind(&ApiListener::UpdateObjectAuthority));
249         m_AuthorityTimer->SetInterval(30);
250         m_AuthorityTimer->Start();
251
252         m_CleanupCertificateRequestsTimer = new Timer();
253         m_CleanupCertificateRequestsTimer->OnTimerExpired.connect(std::bind(&ApiListener::CleanupCertificateRequestsTimerHandler, this));
254         m_CleanupCertificateRequestsTimer->SetInterval(3600);
255         m_CleanupCertificateRequestsTimer->Start();
256         m_CleanupCertificateRequestsTimer->Reschedule(0);
257
258         OnMasterChanged(true);
259 }
260
261 void ApiListener::Stop(bool runtimeDeleted)
262 {
263         ObjectImpl<ApiListener>::Stop(runtimeDeleted);
264
265         Log(LogInformation, "ApiListener")
266                 << "'" << GetName() << "' stopped.";
267
268         boost::mutex::scoped_lock lock(m_LogLock);
269         CloseLogFile();
270 }
271
272 ApiListener::Ptr ApiListener::GetInstance(void)
273 {
274         return m_Instance;
275 }
276
277 Endpoint::Ptr ApiListener::GetMaster(void) const
278 {
279         Zone::Ptr zone = Zone::GetLocalZone();
280
281         if (!zone)
282                 return nullptr;
283
284         std::vector<String> names;
285
286         for (const Endpoint::Ptr& endpoint : zone->GetEndpoints())
287                 if (endpoint->GetConnected() || endpoint->GetName() == GetIdentity())
288                         names.push_back(endpoint->GetName());
289
290         std::sort(names.begin(), names.end());
291
292         return Endpoint::GetByName(*names.begin());
293 }
294
295 bool ApiListener::IsMaster(void) const
296 {
297         Endpoint::Ptr master = GetMaster();
298
299         if (!master)
300                 return false;
301
302         return master == GetLocalEndpoint();
303 }
304
305 /**
306  * Creates a new JSON-RPC listener on the specified port.
307  *
308  * @param node The host the listener should be bound to.
309  * @param service The port to listen on.
310  */
311 bool ApiListener::AddListener(const String& node, const String& service)
312 {
313         ObjectLock olock(this);
314
315         std::shared_ptr<SSL_CTX> sslContext = m_SSLContext;
316
317         if (!sslContext) {
318                 Log(LogCritical, "ApiListener", "SSL context is required for AddListener()");
319                 return false;
320         }
321
322         Log(LogInformation, "ApiListener")
323                 << "Adding new listener on port '" << service << "'";
324
325         TcpSocket::Ptr server = new TcpSocket();
326
327         try {
328                 server->Bind(node, service, AF_UNSPEC);
329         } catch (const std::exception&) {
330                 Log(LogCritical, "ApiListener")
331                         << "Cannot bind TCP socket for host '" << node << "' on port '" << service << "'.";
332                 return false;
333         }
334
335         std::thread thread(std::bind(&ApiListener::ListenerThreadProc, this, server));
336         thread.detach();
337
338         m_Servers.insert(server);
339
340         return true;
341 }
342
343 void ApiListener::ListenerThreadProc(const Socket::Ptr& server)
344 {
345         Utility::SetThreadName("API Listener");
346
347         server->Listen();
348
349         for (;;) {
350                 try {
351                         Socket::Ptr client = server->Accept();
352                         std::thread thread(std::bind(&ApiListener::NewClientHandler, this, client, String(), RoleServer));
353                         thread.detach();
354                 } catch (const std::exception&) {
355                         Log(LogCritical, "ApiListener", "Cannot accept new connection.");
356                 }
357         }
358 }
359
360 /**
361  * Creates a new JSON-RPC client and connects to the specified endpoint.
362  *
363  * @param endpoint The endpoint.
364  */
365 void ApiListener::AddConnection(const Endpoint::Ptr& endpoint)
366 {
367         {
368                 ObjectLock olock(this);
369
370                 std::shared_ptr<SSL_CTX> sslContext = m_SSLContext;
371
372                 if (!sslContext) {
373                         Log(LogCritical, "ApiListener", "SSL context is required for AddConnection()");
374                         return;
375                 }
376         }
377
378         String host = endpoint->GetHost();
379         String port = endpoint->GetPort();
380
381         Log(LogInformation, "ApiListener")
382                 << "Reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'";
383
384         TcpSocket::Ptr client = new TcpSocket();
385
386         try {
387                 endpoint->SetConnecting(true);
388                 client->Connect(host, port);
389                 NewClientHandler(client, endpoint->GetName(), RoleClient);
390                 endpoint->SetConnecting(false);
391         } catch (const std::exception& ex) {
392                 endpoint->SetConnecting(false);
393                 client->Close();
394
395                 std::ostringstream info;
396                 info << "Cannot connect to host '" << host << "' on port '" << port << "'";
397                 Log(LogCritical, "ApiListener", info.str());
398                 Log(LogDebug, "ApiListener")
399                         << info.str() << "\n" << DiagnosticInformation(ex);
400         }
401
402         Log(LogInformation, "ApiListener")
403                 << "Finished reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'";
404 }
405
406 void ApiListener::NewClientHandler(const Socket::Ptr& client, const String& hostname, ConnectionRole role)
407 {
408         try {
409                 NewClientHandlerInternal(client, hostname, role);
410         } catch (const std::exception& ex) {
411                 Log(LogCritical, "ApiListener")
412                         << "Exception while handling new API client connection: " << DiagnosticInformation(ex, false);
413
414                 Log(LogDebug, "ApiListener")
415                         << "Exception while handling new API client connection: " << DiagnosticInformation(ex);
416         }
417 }
418
419 /**
420  * Processes a new client connection.
421  *
422  * @param client The new client.
423  */
424 void ApiListener::NewClientHandlerInternal(const Socket::Ptr& client, const String& hostname, ConnectionRole role)
425 {
426         CONTEXT("Handling new API client connection");
427
428         String conninfo;
429
430         if (role == RoleClient)
431                 conninfo = "to";
432         else
433                 conninfo = "from";
434
435         conninfo += " " + client->GetPeerAddress();
436
437         TlsStream::Ptr tlsStream;
438
439         {
440                 ObjectLock olock(this);
441                 try {
442                         tlsStream = new TlsStream(client, hostname, role, m_SSLContext);
443                 } catch (const std::exception&) {
444                         Log(LogCritical, "ApiListener")
445                                 << "Cannot create TLS stream from client connection (" << conninfo << ")";
446                         return;
447                 }
448         }
449
450         try {
451                 tlsStream->Handshake();
452         } catch (const std::exception&) {
453                 Log(LogCritical, "ApiListener")
454                         << "Client TLS handshake failed (" << conninfo << ")";
455                 return;
456         }
457
458         std::shared_ptr<X509> cert = tlsStream->GetPeerCertificate();
459         String identity;
460         Endpoint::Ptr endpoint;
461         bool verify_ok = false;
462
463         if (cert) {
464                 try {
465                         identity = GetCertificateCN(cert);
466                 } catch (const std::exception&) {
467                         Log(LogCritical, "ApiListener")
468                                 << "Cannot get certificate common name from cert path: '" << GetDefaultCertPath() << "'.";
469                         return;
470                 }
471
472                 verify_ok = tlsStream->IsVerifyOK();
473                 if (!hostname.IsEmpty()) {
474                         if (identity != hostname) {
475                                 Log(LogWarning, "ApiListener")
476                                         << "Unexpected certificate common name while connecting to endpoint '"
477                                         << hostname << "': got '" << identity << "'";
478                                 return;
479                         } else if (!verify_ok) {
480                                 Log(LogWarning, "ApiListener")
481                                         << "Certificate validation failed for endpoint '" << hostname
482                                         << "': " << tlsStream->GetVerifyError();
483                         }
484                 }
485
486                 if (verify_ok)
487                         endpoint = Endpoint::GetByName(identity);
488
489                 {
490                         Log log(LogInformation, "ApiListener");
491
492                         log << "New client connection for identity '" << identity << "' " << conninfo;
493
494                         if (!verify_ok)
495                                 log << " (certificate validation failed: " << tlsStream->GetVerifyError() << ")";
496                         else if (!endpoint)
497                                 log << " (no Endpoint object found for identity)";
498                 }
499         } else {
500                 Log(LogInformation, "ApiListener")
501                         << "New client connection " << conninfo << " (no client certificate)";
502         }
503
504         ClientType ctype;
505
506         if (role == RoleClient) {
507                 Dictionary::Ptr message = new Dictionary();
508                 message->Set("jsonrpc", "2.0");
509                 message->Set("method", "icinga::Hello");
510                 message->Set("params", new Dictionary());
511                 JsonRpc::SendMessage(tlsStream, message);
512                 ctype = ClientJsonRpc;
513         } else {
514                 tlsStream->WaitForData(5);
515
516                 if (!tlsStream->IsDataAvailable()) {
517                         Log(LogWarning, "ApiListener")
518                                 << "No data received on new API connection for identity '" << identity << "'. Ensure that the remote endpoints are properly configured in a cluster setup.";
519                         return;
520                 }
521
522                 char firstByte;
523                 tlsStream->Peek(&firstByte, 1, false);
524
525                 if (firstByte >= '0' && firstByte <= '9')
526                         ctype = ClientJsonRpc;
527                 else
528                         ctype = ClientHttp;
529         }
530
531         if (ctype == ClientJsonRpc) {
532                 Log(LogNotice, "ApiListener", "New JSON-RPC client");
533
534                 JsonRpcConnection::Ptr aclient = new JsonRpcConnection(identity, verify_ok, tlsStream, role);
535                 aclient->Start();
536
537                 if (endpoint) {
538                         bool needSync = !endpoint->GetConnected();
539
540                         endpoint->AddClient(aclient);
541
542                         m_SyncQueue.Enqueue(std::bind(&ApiListener::SyncClient, this, aclient, endpoint, needSync));
543                 } else
544                         AddAnonymousClient(aclient);
545         } else {
546                 Log(LogNotice, "ApiListener", "New HTTP client");
547
548                 HttpServerConnection::Ptr aclient = new HttpServerConnection(identity, verify_ok, tlsStream);
549                 aclient->Start();
550                 AddHttpClient(aclient);
551         }
552 }
553
554 void ApiListener::SyncClient(const JsonRpcConnection::Ptr& aclient, const Endpoint::Ptr& endpoint, bool needSync)
555 {
556         Zone::Ptr eZone = endpoint->GetZone();
557
558         try {
559                 {
560                         ObjectLock olock(endpoint);
561
562                         endpoint->SetSyncing(true);
563                 }
564
565                 Zone::Ptr myZone = Zone::GetLocalZone();
566
567                 if (myZone->GetParent() == eZone) {
568                         Log(LogInformation, "ApiListener")
569                                 << "Requesting new certificate for this Icinga instance from endpoint '" << endpoint->GetName() << "'.";
570
571                         JsonRpcConnection::SendCertificateRequest(aclient, nullptr, String());
572
573                         if (Utility::PathExists(ApiListener::GetCertificateRequestsDir()))
574                                 Utility::Glob(ApiListener::GetCertificateRequestsDir() + "/*.json", std::bind(&JsonRpcConnection::SendCertificateRequest, aclient, nullptr, _1), GlobFile);
575                 }
576
577                 /* Make sure that the config updates are synced
578                  * before the logs are replayed.
579                  */
580
581                 Log(LogInformation, "ApiListener")
582                         << "Sending config updates for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
583
584                 /* sync zone file config */
585                 SendConfigUpdate(aclient);
586
587                 Log(LogInformation, "ApiListener")
588                         << "Finished sending config file updates for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
589
590                 /* sync runtime config */
591                 SendRuntimeConfigObjects(aclient);
592
593                 Log(LogInformation, "ApiListener")
594                         << "Finished sending runtime config updates for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
595
596                 if (!needSync) {
597                         ObjectLock olock2(endpoint);
598                         endpoint->SetSyncing(false);
599                         return;
600                 }
601
602                 Log(LogInformation, "ApiListener")
603                         << "Sending replay log for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
604
605                 ReplayLog(aclient);
606
607                 if (eZone == Zone::GetLocalZone())
608                         UpdateObjectAuthority();
609
610                 Log(LogInformation, "ApiListener")
611                         << "Finished sending replay log for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
612         } catch (const std::exception& ex) {
613                 {
614                         ObjectLock olock2(endpoint);
615                         endpoint->SetSyncing(false);
616                 }
617
618                 Log(LogCritical, "ApiListener")
619                         << "Error while syncing endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex, false);
620
621                 Log(LogDebug, "ApiListener")
622                         << "Error while syncing endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex);
623         }
624
625         Log(LogInformation, "ApiListener")
626                 << "Finished syncing endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
627 }
628
629 void ApiListener::ApiTimerHandler(void)
630 {
631         double now = Utility::GetTime();
632
633         std::vector<int> files;
634         Utility::Glob(GetApiDir() + "log/*", std::bind(&ApiListener::LogGlobHandler, std::ref(files), _1), GlobFile);
635         std::sort(files.begin(), files.end());
636
637         for (int ts : files) {
638                 bool need = false;
639
640                 for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) {
641                         if (endpoint == GetLocalEndpoint())
642                                 continue;
643
644                         if (endpoint->GetLogDuration() >= 0 && ts < now - endpoint->GetLogDuration())
645                                 continue;
646
647                         if (ts > endpoint->GetLocalLogPosition()) {
648                                 need = true;
649                                 break;
650                         }
651                 }
652
653                 if (!need) {
654                         String path = GetApiDir() + "log/" + Convert::ToString(ts);
655                         Log(LogNotice, "ApiListener")
656                                 << "Removing old log file: " << path;
657                         (void)unlink(path.CStr());
658                 }
659         }
660
661         for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) {
662                 if (!endpoint->GetConnected())
663                         continue;
664
665                 double ts = endpoint->GetRemoteLogPosition();
666
667                 if (ts == 0)
668                         continue;
669
670                 Dictionary::Ptr lparams = new Dictionary();
671                 lparams->Set("log_position", ts);
672
673                 Dictionary::Ptr lmessage = new Dictionary();
674                 lmessage->Set("jsonrpc", "2.0");
675                 lmessage->Set("method", "log::SetLogPosition");
676                 lmessage->Set("params", lparams);
677
678                 double maxTs = 0;
679
680                 for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
681                         if (client->GetTimestamp() > maxTs)
682                                 maxTs = client->GetTimestamp();
683                 }
684
685                 for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
686                         if (client->GetTimestamp() != maxTs)
687                                 client->Disconnect();
688                         else
689                                 client->SendMessage(lmessage);
690                 }
691
692                 Log(LogNotice, "ApiListener")
693                         << "Setting log position for identity '" << endpoint->GetName() << "': "
694                         << Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", ts);
695         }
696 }
697
698 void ApiListener::ApiReconnectTimerHandler(void)
699 {
700         Zone::Ptr my_zone = Zone::GetLocalZone();
701
702         for (const Zone::Ptr& zone : ConfigType::GetObjectsByType<Zone>()) {
703                 /* don't connect to global zones */
704                 if (zone->GetGlobal())
705                         continue;
706
707                 /* only connect to endpoints in a) the same zone b) our parent zone c) immediate child zones */
708                 if (my_zone != zone && my_zone != zone->GetParent() && zone != my_zone->GetParent()) {
709                         Log(LogDebug, "ApiListener")
710                                 << "Not connecting to Zone '" << zone->GetName()
711                                 << "' because it's not in the same zone, a parent or a child zone.";
712                         continue;
713                 }
714
715                 for (const Endpoint::Ptr& endpoint : zone->GetEndpoints()) {
716                         /* don't connect to ourselves */
717                         if (endpoint == GetLocalEndpoint()) {
718                                 Log(LogDebug, "ApiListener")
719                                         << "Not connecting to Endpoint '" << endpoint->GetName() << "' because that's us.";
720                                 continue;
721                         }
722
723                         /* don't try to connect to endpoints which don't have a host and port */
724                         if (endpoint->GetHost().IsEmpty() || endpoint->GetPort().IsEmpty()) {
725                                 Log(LogDebug, "ApiListener")
726                                         << "Not connecting to Endpoint '" << endpoint->GetName()
727                                         << "' because the host/port attributes are missing.";
728                                 continue;
729                         }
730
731                         /* don't try to connect if there's already a connection attempt */
732                         if (endpoint->GetConnecting()) {
733                                 Log(LogDebug, "ApiListener")
734                                         << "Not connecting to Endpoint '" << endpoint->GetName()
735                                         << "' because we're already trying to connect to it.";
736                                 continue;
737                         }
738
739                         /* don't try to connect if we're already connected */
740                         if (endpoint->GetConnected()) {
741                                 Log(LogDebug, "ApiListener")
742                                         << "Not connecting to Endpoint '" << endpoint->GetName()
743                                         << "' because we're already connected to it.";
744                                 continue;
745                         }
746
747                         std::thread thread(std::bind(&ApiListener::AddConnection, this, endpoint));
748                         thread.detach();
749                 }
750         }
751
752         Endpoint::Ptr master = GetMaster();
753
754         if (master)
755                 Log(LogNotice, "ApiListener")
756                         << "Current zone master: " << master->GetName();
757
758         std::vector<String> names;
759         for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>())
760                 if (endpoint->GetConnected())
761                         names.emplace_back(endpoint->GetName() + " (" + Convert::ToString(endpoint->GetClients().size()) + ")");
762
763         Log(LogNotice, "ApiListener")
764                 << "Connected endpoints: " << Utility::NaturalJoin(names);
765 }
766
767 static void CleanupCertificateRequest(const String& path, double expiryTime)
768 {
769 #ifndef _WIN32
770         struct stat statbuf;
771         if (lstat(path.CStr(), &statbuf) < 0)
772                 return;
773 #else /* _WIN32 */
774         struct _stat statbuf;
775         if (_stat(path.CStr(), &statbuf) < 0)
776                 return;
777 #endif /* _WIN32 */
778
779         if (statbuf.st_mtime < expiryTime)
780                 (void) unlink(path.CStr());
781 }
782
783 void ApiListener::CleanupCertificateRequestsTimerHandler(void)
784 {
785         String requestsDir = GetCertificateRequestsDir();
786
787         if (Utility::PathExists(requestsDir)) {
788                 /* remove certificate requests that are older than a week */
789                 double expiryTime = Utility::GetTime() - 7 * 24 * 60 * 60;
790                 Utility::Glob(requestsDir + "/*.json", std::bind(&CleanupCertificateRequest, _1, expiryTime), GlobFile);
791         }
792 }
793
794 void ApiListener::RelayMessage(const MessageOrigin::Ptr& origin,
795         const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log)
796 {
797         if (!IsActive())
798                 return;
799
800         m_RelayQueue.Enqueue(std::bind(&ApiListener::SyncRelayMessage, this, origin, secobj, message, log), PriorityNormal, true);
801 }
802
803 void ApiListener::PersistMessage(const Dictionary::Ptr& message, const ConfigObject::Ptr& secobj)
804 {
805         double ts = message->Get("ts");
806
807         ASSERT(ts != 0);
808
809         Dictionary::Ptr pmessage = new Dictionary();
810         pmessage->Set("timestamp", ts);
811
812         pmessage->Set("message", JsonEncode(message));
813
814         if (secobj) {
815                 Dictionary::Ptr secname = new Dictionary();
816                 secname->Set("type", secobj->GetReflectionType()->GetName());
817                 secname->Set("name", secobj->GetName());
818                 pmessage->Set("secobj", secname);
819         }
820
821         boost::mutex::scoped_lock lock(m_LogLock);
822         if (m_LogFile) {
823                 NetString::WriteStringToStream(m_LogFile, JsonEncode(pmessage));
824                 m_LogMessageCount++;
825                 SetLogMessageTimestamp(ts);
826
827                 if (m_LogMessageCount > 50000) {
828                         CloseLogFile();
829                         RotateLogFile();
830                         OpenLogFile();
831                 }
832         }
833 }
834
835 void ApiListener::SyncSendMessage(const Endpoint::Ptr& endpoint, const Dictionary::Ptr& message)
836 {
837         ObjectLock olock(endpoint);
838
839         if (!endpoint->GetSyncing()) {
840                 Log(LogNotice, "ApiListener")
841                         << "Sending message '" << message->Get("method") << "' to '" << endpoint->GetName() << "'";
842
843                 double maxTs = 0;
844
845                 for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
846                         if (client->GetTimestamp() > maxTs)
847                                 maxTs = client->GetTimestamp();
848                 }
849
850                 for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
851                         if (client->GetTimestamp() != maxTs)
852                                 continue;
853
854                         client->SendMessage(message);
855                 }
856         }
857 }
858
859 bool ApiListener::RelayMessageOne(const Zone::Ptr& targetZone, const MessageOrigin::Ptr& origin, const Dictionary::Ptr& message, const Endpoint::Ptr& currentMaster)
860 {
861         ASSERT(targetZone);
862
863         Zone::Ptr myZone = Zone::GetLocalZone();
864
865         /* only relay the message to a) the same zone, b) the parent zone and c) direct child zones. Exception is a global zone. */
866         if (!targetZone->GetGlobal() &&
867                 targetZone != myZone &&
868                 targetZone != myZone->GetParent() &&
869                 targetZone->GetParent() != myZone) {
870                 return true;
871         }
872
873         Endpoint::Ptr myEndpoint = GetLocalEndpoint();
874
875         std::vector<Endpoint::Ptr> skippedEndpoints;
876
877         bool relayed = false, log_needed = false, log_done = false;
878
879         std::set<Endpoint::Ptr> targetEndpoints;
880
881         if (targetZone->GetGlobal()) {
882                 targetEndpoints = myZone->GetEndpoints();
883
884                 for (const Zone::Ptr& zone : ConfigType::GetObjectsByType<Zone>()) {
885                         /* Fetch immediate child zone members */
886                         if (zone->GetParent() == myZone) {
887                                 std::set<Endpoint::Ptr> endpoints = zone->GetEndpoints();
888                                 targetEndpoints.insert(endpoints.begin(), endpoints.end());
889                         }
890                 }
891         } else {
892                 targetEndpoints = targetZone->GetEndpoints();
893         }
894
895         for (const Endpoint::Ptr& endpoint : targetEndpoints) {
896                 /* don't relay messages to ourselves */
897                 if (endpoint == GetLocalEndpoint())
898                         continue;
899
900                 log_needed = true;
901
902                 /* don't relay messages to disconnected endpoints */
903                 if (!endpoint->GetConnected()) {
904                         if (targetZone == myZone)
905                                 log_done = false;
906
907                         continue;
908                 }
909
910                 log_done = true;
911
912                 /* don't relay the message to the zone through more than one endpoint unless this is our own zone */
913                 if (relayed && targetZone != myZone) {
914                         skippedEndpoints.push_back(endpoint);
915                         continue;
916                 }
917
918                 /* don't relay messages back to the endpoint which we got the message from */
919                 if (origin && origin->FromClient && endpoint == origin->FromClient->GetEndpoint()) {
920                         skippedEndpoints.push_back(endpoint);
921                         continue;
922                 }
923
924                 /* don't relay messages back to the zone which we got the message from */
925                 if (origin && origin->FromZone && targetZone == origin->FromZone) {
926                         skippedEndpoints.push_back(endpoint);
927                         continue;
928                 }
929
930                 /* only relay message to the master if we're not currently the master */
931                 if (currentMaster != myEndpoint && currentMaster != endpoint) {
932                         skippedEndpoints.push_back(endpoint);
933                         continue;
934                 }
935
936                 relayed = true;
937
938                 SyncSendMessage(endpoint, message);
939         }
940
941         if (!skippedEndpoints.empty()) {
942                 double ts = message->Get("ts");
943
944                 for (const Endpoint::Ptr& endpoint : skippedEndpoints)
945                         endpoint->SetLocalLogPosition(ts);
946         }
947
948         return !log_needed || log_done;
949 }
950
951 void ApiListener::SyncRelayMessage(const MessageOrigin::Ptr& origin,
952         const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log)
953 {
954         double ts = Utility::GetTime();
955         message->Set("ts", ts);
956
957         Log(LogNotice, "ApiListener")
958                 << "Relaying '" << message->Get("method") << "' message";
959
960         if (origin && origin->FromZone)
961                 message->Set("originZone", origin->FromZone->GetName());
962
963         Zone::Ptr target_zone;
964
965         if (secobj) {
966                 if (secobj->GetReflectionType() == Zone::TypeInstance)
967                         target_zone = static_pointer_cast<Zone>(secobj);
968                 else
969                         target_zone = static_pointer_cast<Zone>(secobj->GetZone());
970         }
971
972         if (!target_zone)
973                 target_zone = Zone::GetLocalZone();
974
975         Endpoint::Ptr master = GetMaster();
976
977         bool need_log = !RelayMessageOne(target_zone, origin, message, master);
978
979         for (const Zone::Ptr& zone : target_zone->GetAllParents()) {
980                 if (!RelayMessageOne(zone, origin, message, master))
981                         need_log = true;
982         }
983
984         if (log && need_log)
985                 PersistMessage(message, secobj);
986 }
987
988 /* must hold m_LogLock */
989 void ApiListener::OpenLogFile(void)
990 {
991         String path = GetApiDir() + "log/current";
992
993         Utility::MkDirP(Utility::DirName(path), 0750);
994
995         std::fstream *fp = new std::fstream(path.CStr(), std::fstream::out | std::ofstream::app);
996
997         if (!fp->good()) {
998                 Log(LogWarning, "ApiListener")
999                         << "Could not open spool file: " << path;
1000                 return;
1001         }
1002
1003         m_LogFile = new StdioStream(fp, true);
1004         m_LogMessageCount = 0;
1005         SetLogMessageTimestamp(Utility::GetTime());
1006 }
1007
1008 /* must hold m_LogLock */
1009 void ApiListener::CloseLogFile(void)
1010 {
1011         if (!m_LogFile)
1012                 return;
1013
1014         m_LogFile->Close();
1015         m_LogFile.reset();
1016 }
1017
1018 /* must hold m_LogLock */
1019 void ApiListener::RotateLogFile(void)
1020 {
1021         double ts = GetLogMessageTimestamp();
1022
1023         if (ts == 0)
1024                 ts = Utility::GetTime();
1025
1026         String oldpath = GetApiDir() + "log/current";
1027         String newpath = GetApiDir() + "log/" + Convert::ToString(static_cast<int>(ts)+1);
1028         (void) rename(oldpath.CStr(), newpath.CStr());
1029 }
1030
1031 void ApiListener::LogGlobHandler(std::vector<int>& files, const String& file)
1032 {
1033         String name = Utility::BaseName(file);
1034
1035         if (name == "current")
1036                 return;
1037
1038         int ts;
1039
1040         try {
1041                 ts = Convert::ToLong(name);
1042         } catch (const std::exception&) {
1043                 return;
1044         }
1045
1046         files.push_back(ts);
1047 }
1048
1049 void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)
1050 {
1051         Endpoint::Ptr endpoint = client->GetEndpoint();
1052
1053         if (endpoint->GetLogDuration() == 0) {
1054                 ObjectLock olock2(endpoint);
1055                 endpoint->SetSyncing(false);
1056                 return;
1057         }
1058
1059         CONTEXT("Replaying log for Endpoint '" + endpoint->GetName() + "'");
1060
1061         int count = -1;
1062         double peer_ts = endpoint->GetLocalLogPosition();
1063         double logpos_ts = peer_ts;
1064         bool last_sync = false;
1065
1066         Endpoint::Ptr target_endpoint = client->GetEndpoint();
1067         ASSERT(target_endpoint);
1068
1069         Zone::Ptr target_zone = target_endpoint->GetZone();
1070
1071         if (!target_zone) {
1072                 ObjectLock olock2(endpoint);
1073                 endpoint->SetSyncing(false);
1074                 return;
1075         }
1076
1077         for (;;) {
1078                 boost::mutex::scoped_lock lock(m_LogLock);
1079
1080                 CloseLogFile();
1081                 RotateLogFile();
1082
1083                 if (count == -1 || count > 50000) {
1084                         OpenLogFile();
1085                         lock.unlock();
1086                 } else {
1087                         last_sync = true;
1088                 }
1089
1090                 count = 0;
1091
1092                 std::vector<int> files;
1093                 Utility::Glob(GetApiDir() + "log/*", std::bind(&ApiListener::LogGlobHandler, std::ref(files), _1), GlobFile);
1094                 std::sort(files.begin(), files.end());
1095
1096                 for (int ts : files) {
1097                         String path = GetApiDir() + "log/" + Convert::ToString(ts);
1098
1099                         if (ts < peer_ts)
1100                                 continue;
1101
1102                         Log(LogNotice, "ApiListener")
1103                                 << "Replaying log: " << path;
1104
1105                         std::fstream *fp = new std::fstream(path.CStr(), std::fstream::in | std::fstream::binary);
1106                         StdioStream::Ptr logStream = new StdioStream(fp, true);
1107
1108                         String message;
1109                         StreamReadContext src;
1110                         while (true) {
1111                                 Dictionary::Ptr pmessage;
1112
1113                                 try {
1114                                         StreamReadStatus srs = NetString::ReadStringFromStream(logStream, &message, src);
1115
1116                                         if (srs == StatusEof)
1117                                                 break;
1118
1119                                         if (srs != StatusNewItem)
1120                                                 continue;
1121
1122                                         pmessage = JsonDecode(message);
1123                                 } catch (const std::exception&) {
1124                                         Log(LogWarning, "ApiListener")
1125                                                 << "Unexpected end-of-file for cluster log: " << path;
1126
1127                                         /* Log files may be incomplete or corrupted. This is perfectly OK. */
1128                                         break;
1129                                 }
1130
1131                                 if (pmessage->Get("timestamp") <= peer_ts)
1132                                         continue;
1133
1134                                 Dictionary::Ptr secname = pmessage->Get("secobj");
1135
1136                                 if (secname) {
1137                                         ConfigObject::Ptr secobj = ConfigObject::GetObject(secname->Get("type"), secname->Get("name"));
1138
1139                                         if (!secobj)
1140                                                 continue;
1141
1142                                         if (!target_zone->CanAccessObject(secobj))
1143                                                 continue;
1144                                 }
1145
1146                                 try  {
1147                                         size_t bytesSent = NetString::WriteStringToStream(client->GetStream(), pmessage->Get("message"));
1148                                         endpoint->AddMessageSent(bytesSent);
1149                                         count++;
1150                                 } catch (const std::exception& ex) {
1151                                         Log(LogWarning, "ApiListener")
1152                                                 << "Error while replaying log for endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex, false);
1153
1154                                         Log(LogDebug, "ApiListener")
1155                                                 << "Error while replaying log for endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex);
1156
1157                                         break;
1158                                 }
1159
1160                                 peer_ts = pmessage->Get("timestamp");
1161
1162                                 if (ts > logpos_ts + 10) {
1163                                         logpos_ts = ts;
1164
1165                                         Dictionary::Ptr lparams = new Dictionary();
1166                                         lparams->Set("log_position", logpos_ts);
1167
1168                                         Dictionary::Ptr lmessage = new Dictionary();
1169                                         lmessage->Set("jsonrpc", "2.0");
1170                                         lmessage->Set("method", "log::SetLogPosition");
1171                                         lmessage->Set("params", lparams);
1172
1173                                         size_t bytesSent = JsonRpc::SendMessage(client->GetStream(), lmessage);
1174                                         endpoint->AddMessageSent(bytesSent);
1175                                 }
1176                         }
1177
1178                         logStream->Close();
1179                 }
1180
1181                 if (count > 0) {
1182                         Log(LogInformation, "ApiListener")
1183                                 << "Replayed " << count << " messages.";
1184                 }
1185
1186                 Log(LogNotice, "ApiListener")
1187                         << "Replayed " << count << " messages.";
1188
1189                 if (last_sync) {
1190                         {
1191                                 ObjectLock olock2(endpoint);
1192                                 endpoint->SetSyncing(false);
1193                         }
1194
1195                         OpenLogFile();
1196
1197                         break;
1198                 }
1199         }
1200 }
1201
1202 void ApiListener::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
1203 {
1204         std::pair<Dictionary::Ptr, Dictionary::Ptr> stats;
1205
1206         ApiListener::Ptr listener = ApiListener::GetInstance();
1207
1208         if (!listener)
1209                 return;
1210
1211         stats = listener->GetStatus();
1212
1213         ObjectLock olock(stats.second);
1214         for (const Dictionary::Pair& kv : stats.second)
1215                 perfdata->Add(new PerfdataValue("api_" + kv.first, kv.second));
1216
1217         status->Set("api", stats.first);
1218 }
1219
1220 std::pair<Dictionary::Ptr, Dictionary::Ptr> ApiListener::GetStatus(void)
1221 {
1222         Dictionary::Ptr status = new Dictionary();
1223         Dictionary::Ptr perfdata = new Dictionary();
1224
1225         /* cluster stats */
1226         status->Set("identity", GetIdentity());
1227
1228         double allEndpoints = 0;
1229         Array::Ptr allNotConnectedEndpoints = new Array();
1230         Array::Ptr allConnectedEndpoints = new Array();
1231
1232         Zone::Ptr my_zone = Zone::GetLocalZone();
1233
1234         Dictionary::Ptr connectedZones = new Dictionary();
1235
1236         for (const Zone::Ptr& zone : ConfigType::GetObjectsByType<Zone>()) {
1237                 /* only check endpoints in a) the same zone b) our parent zone c) immediate child zones */
1238                 if (my_zone != zone && my_zone != zone->GetParent() && zone != my_zone->GetParent()) {
1239                         Log(LogDebug, "ApiListener")
1240                                 << "Not checking connection to Zone '" << zone->GetName() << "' because it's not in the same zone, a parent or a child zone.";
1241                         continue;
1242                 }
1243
1244                 bool zoneConnected = false;
1245                 int countZoneEndpoints = 0;
1246                 double zoneLag = 0;
1247
1248                 Array::Ptr zoneEndpoints = new Array();
1249
1250                 for (const Endpoint::Ptr& endpoint : zone->GetEndpoints()) {
1251                         zoneEndpoints->Add(endpoint->GetName());
1252
1253                         if (endpoint->GetName() == GetIdentity())
1254                                 continue;
1255
1256                         double eplag = CalculateZoneLag(endpoint);
1257
1258                         if (eplag > 0 && eplag > zoneLag)
1259                                 zoneLag = eplag;
1260
1261                         allEndpoints++;
1262                         countZoneEndpoints++;
1263
1264                         if (!endpoint->GetConnected()) {
1265                                 allNotConnectedEndpoints->Add(endpoint->GetName());
1266                         } else {
1267                                 allConnectedEndpoints->Add(endpoint->GetName());
1268                                 zoneConnected = true;
1269                         }
1270                 }
1271
1272                 /* if there's only one endpoint inside the zone, we're not connected - that's us, fake it */
1273                 if (zone->GetEndpoints().size() == 1 && countZoneEndpoints == 0)
1274                         zoneConnected = true;
1275
1276                 Dictionary::Ptr zoneStats = new Dictionary();
1277                 zoneStats->Set("connected", zoneConnected);
1278                 zoneStats->Set("client_log_lag", zoneLag);
1279                 zoneStats->Set("endpoints", zoneEndpoints);
1280
1281                 String parentZoneName;
1282                 Zone::Ptr parentZone = zone->GetParent();
1283                 if (parentZone)
1284                         parentZoneName = parentZone->GetName();
1285
1286                 zoneStats->Set("parent_zone", parentZoneName);
1287
1288                 connectedZones->Set(zone->GetName(), zoneStats);
1289         }
1290
1291         status->Set("num_endpoints", allEndpoints);
1292         status->Set("num_conn_endpoints", allConnectedEndpoints->GetLength());
1293         status->Set("num_not_conn_endpoints", allNotConnectedEndpoints->GetLength());
1294         status->Set("conn_endpoints", allConnectedEndpoints);
1295         status->Set("not_conn_endpoints", allNotConnectedEndpoints);
1296
1297         status->Set("zones", connectedZones);
1298
1299         /* connection stats */
1300         size_t jsonRpcClients = GetAnonymousClients().size();
1301         size_t httpClients = GetHttpClients().size();
1302         size_t workQueueItems = JsonRpcConnection::GetWorkQueueLength();
1303         size_t workQueueCount = JsonRpcConnection::GetWorkQueueCount();
1304         size_t syncQueueItems = m_SyncQueue.GetLength();
1305         size_t relayQueueItems = m_RelayQueue.GetLength();
1306         double workQueueItemRate = JsonRpcConnection::GetWorkQueueRate();
1307         double syncQueueItemRate = m_SyncQueue.GetTaskCount(60) / 60.0;
1308         double relayQueueItemRate = m_RelayQueue.GetTaskCount(60) / 60.0;
1309
1310         Dictionary::Ptr jsonRpc = new Dictionary();
1311         jsonRpc->Set("clients", jsonRpcClients);
1312         jsonRpc->Set("work_queue_items", workQueueItems);
1313         jsonRpc->Set("work_queue_count", workQueueCount);
1314         jsonRpc->Set("sync_queue_items", syncQueueItems);
1315         jsonRpc->Set("relay_queue_items", relayQueueItems);
1316
1317         jsonRpc->Set("work_queue_item_rate", workQueueItemRate);
1318         jsonRpc->Set("sync_queue_item_rate", syncQueueItemRate);
1319         jsonRpc->Set("relay_queue_item_rate", relayQueueItemRate);
1320
1321         Dictionary::Ptr http = new Dictionary();
1322         http->Set("clients", httpClients);
1323
1324         status->Set("json_rpc", jsonRpc);
1325         status->Set("http", http);
1326
1327         /* performance data */
1328         perfdata->Set("num_endpoints", allEndpoints);
1329         perfdata->Set("num_conn_endpoints", Convert::ToDouble(allConnectedEndpoints->GetLength()));
1330         perfdata->Set("num_not_conn_endpoints", Convert::ToDouble(allNotConnectedEndpoints->GetLength()));
1331
1332         perfdata->Set("num_json_rpc_clients", jsonRpcClients);
1333         perfdata->Set("num_http_clients", httpClients);
1334         perfdata->Set("num_json_rpc_work_queue_items", workQueueItems);
1335         perfdata->Set("num_json_rpc_work_queue_count", workQueueCount);
1336         perfdata->Set("num_json_rpc_sync_queue_items", syncQueueItems);
1337         perfdata->Set("num_json_rpc_relay_queue_items", relayQueueItems);
1338
1339         perfdata->Set("num_json_rpc_work_queue_item_rate", workQueueItemRate);
1340         perfdata->Set("num_json_rpc_sync_queue_item_rate", syncQueueItemRate);
1341         perfdata->Set("num_json_rpc_relay_queue_item_rate", relayQueueItemRate);
1342
1343         return std::make_pair(status, perfdata);
1344 }
1345
1346 double ApiListener::CalculateZoneLag(const Endpoint::Ptr& endpoint)
1347 {
1348         double remoteLogPosition = endpoint->GetRemoteLogPosition();
1349         double eplag = Utility::GetTime() - remoteLogPosition;
1350
1351         if ((endpoint->GetSyncing() || !endpoint->GetConnected()) && remoteLogPosition != 0)
1352                 return eplag;
1353
1354         return 0;
1355 }
1356
1357 void ApiListener::AddAnonymousClient(const JsonRpcConnection::Ptr& aclient)
1358 {
1359         boost::mutex::scoped_lock lock(m_AnonymousClientsLock);
1360         m_AnonymousClients.insert(aclient);
1361 }
1362
1363 void ApiListener::RemoveAnonymousClient(const JsonRpcConnection::Ptr& aclient)
1364 {
1365         boost::mutex::scoped_lock lock(m_AnonymousClientsLock);
1366         m_AnonymousClients.erase(aclient);
1367 }
1368
1369 std::set<JsonRpcConnection::Ptr> ApiListener::GetAnonymousClients(void) const
1370 {
1371         boost::mutex::scoped_lock lock(m_AnonymousClientsLock);
1372         return m_AnonymousClients;
1373 }
1374
1375 void ApiListener::AddHttpClient(const HttpServerConnection::Ptr& aclient)
1376 {
1377         boost::mutex::scoped_lock lock(m_HttpClientsLock);
1378         m_HttpClients.insert(aclient);
1379 }
1380
1381 void ApiListener::RemoveHttpClient(const HttpServerConnection::Ptr& aclient)
1382 {
1383         boost::mutex::scoped_lock lock(m_HttpClientsLock);
1384         m_HttpClients.erase(aclient);
1385 }
1386
1387 std::set<HttpServerConnection::Ptr> ApiListener::GetHttpClients(void) const
1388 {
1389         boost::mutex::scoped_lock lock(m_HttpClientsLock);
1390         return m_HttpClients;
1391 }
1392
1393 Value ApiListener::HelloAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params)
1394 {
1395         return Empty;
1396 }
1397
1398 Endpoint::Ptr ApiListener::GetLocalEndpoint(void) const
1399 {
1400         return m_LocalEndpoint;
1401 }
1402
1403 void ApiListener::ValidateTlsProtocolmin(const String& value, const ValidationUtils& utils)
1404 {
1405         ObjectImpl<ApiListener>::ValidateTlsProtocolmin(value, utils);
1406
1407         if (value != SSL_TXT_TLSV1
1408 #ifdef SSL_TXT_TLSV1_1
1409                 && value != SSL_TXT_TLSV1_1 &&
1410                 value != SSL_TXT_TLSV1_2
1411 #endif /* SSL_TXT_TLSV1_1 */
1412                 ) {
1413                 String message = "Invalid TLS version. Must be one of '" SSL_TXT_TLSV1 "'";
1414 #ifdef SSL_TXT_TLSV1_1
1415                 message += ", '" SSL_TXT_TLSV1_1 "' or '" SSL_TXT_TLSV1_2 "'";
1416 #endif /* SSL_TXT_TLSV1_1 */
1417
1418                 BOOST_THROW_EXCEPTION(ValidationError(this, { "tls_protocolmin" }, message));
1419         }
1420 }
1421
1422 bool ApiListener::IsHACluster(void)
1423 {
1424         Zone::Ptr zone = Zone::GetLocalZone();
1425
1426         if (!zone)
1427                 return false;
1428
1429         return zone->IsSingleInstance();
1430 }
1431
1432 /* Provide a helper function for zone origin name. */
1433 String ApiListener::GetFromZoneName(const Zone::Ptr& fromZone)
1434 {
1435         String fromZoneName;
1436
1437         if (fromZone) {
1438                 fromZoneName = fromZone->GetName();
1439         } else {
1440                 Zone::Ptr lzone = Zone::GetLocalZone();
1441
1442                 if (lzone)
1443                         fromZoneName = lzone->GetName();
1444         }
1445
1446         return fromZoneName;
1447 }