]> granicus.if.org Git - icinga2/blob - lib/remote/apilistener.cpp
Merge pull request #7591 from Icinga/feature/docs-api-joins
[icinga2] / lib / remote / apilistener.cpp
1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
2
3 #include "remote/apilistener.hpp"
4 #include "remote/apilistener-ti.cpp"
5 #include "remote/jsonrpcconnection.hpp"
6 #include "remote/endpoint.hpp"
7 #include "remote/jsonrpc.hpp"
8 #include "remote/apifunction.hpp"
9 #include "remote/configpackageutility.hpp"
10 #include "remote/configobjectutility.hpp"
11 #include "base/convert.hpp"
12 #include "base/defer.hpp"
13 #include "base/io-engine.hpp"
14 #include "base/netstring.hpp"
15 #include "base/json.hpp"
16 #include "base/configtype.hpp"
17 #include "base/logger.hpp"
18 #include "base/objectlock.hpp"
19 #include "base/stdiostream.hpp"
20 #include "base/perfdatavalue.hpp"
21 #include "base/application.hpp"
22 #include "base/context.hpp"
23 #include "base/statsfunction.hpp"
24 #include "base/exception.hpp"
25 #include "base/tcpsocket.hpp"
26 #include <boost/asio/buffer.hpp>
27 #include <boost/asio/ip/tcp.hpp>
28 #include <boost/asio/spawn.hpp>
29 #include <boost/asio/ssl/context.hpp>
30 #include <boost/system/error_code.hpp>
31 #include <climits>
32 #include <fstream>
33 #include <memory>
34 #include <openssl/ssl.h>
35 #include <openssl/tls1.h>
36 #include <openssl/x509.h>
37 #include <sstream>
38 #include <utility>
39
40 using namespace icinga;
41
42 REGISTER_TYPE(ApiListener);
43
44 boost::signals2::signal<void(bool)> ApiListener::OnMasterChanged;
45 ApiListener::Ptr ApiListener::m_Instance;
46
47 REGISTER_STATSFUNCTION(ApiListener, &ApiListener::StatsFunc);
48
49 REGISTER_APIFUNCTION(Hello, icinga, &ApiListener::HelloAPIHandler);
50
51 ApiListener::ApiListener()
52 {
53         m_RelayQueue.SetName("ApiListener, RelayQueue");
54         m_SyncQueue.SetName("ApiListener, SyncQueue");
55 }
56
57 String ApiListener::GetApiDir()
58 {
59         return Configuration::DataDir + "/api/";
60 }
61
62 String ApiListener::GetApiZonesDir()
63 {
64         return GetApiDir() + "zones/";
65 }
66
67 String ApiListener::GetApiZonesStageDir()
68 {
69         return GetApiDir() + "zones-stage/";
70 }
71
72 String ApiListener::GetCertsDir()
73 {
74         return Configuration::DataDir + "/certs/";
75 }
76
77 String ApiListener::GetCaDir()
78 {
79         return Configuration::DataDir + "/ca/";
80 }
81
82 String ApiListener::GetCertificateRequestsDir()
83 {
84         return Configuration::DataDir + "/certificate-requests/";
85 }
86
87 String ApiListener::GetDefaultCertPath()
88 {
89         return GetCertsDir() + "/" + ScriptGlobal::Get("NodeName") + ".crt";
90 }
91
92 String ApiListener::GetDefaultKeyPath()
93 {
94         return GetCertsDir() + "/" + ScriptGlobal::Get("NodeName") + ".key";
95 }
96
97 String ApiListener::GetDefaultCaPath()
98 {
99         return GetCertsDir() + "/ca.crt";
100 }
101
102 double ApiListener::GetTlsHandshakeTimeout() const
103 {
104         return Configuration::TlsHandshakeTimeout;
105 }
106
107 void ApiListener::SetTlsHandshakeTimeout(double value, bool suppress_events, const Value& cookie)
108 {
109         Configuration::TlsHandshakeTimeout = value;
110 }
111
112 void ApiListener::CopyCertificateFile(const String& oldCertPath, const String& newCertPath)
113 {
114         struct stat st1, st2;
115
116         if (!oldCertPath.IsEmpty() && stat(oldCertPath.CStr(), &st1) >= 0 && (stat(newCertPath.CStr(), &st2) < 0 || st1.st_mtime > st2.st_mtime)) {
117                 Log(LogWarning, "ApiListener")
118                         << "Copying '" << oldCertPath << "' certificate file to '" << newCertPath << "'";
119
120                 Utility::MkDirP(Utility::DirName(newCertPath), 0700);
121                 Utility::CopyFile(oldCertPath, newCertPath);
122         }
123 }
124
125 void ApiListener::OnConfigLoaded()
126 {
127         if (m_Instance)
128                 BOOST_THROW_EXCEPTION(ScriptError("Only one ApiListener object is allowed.", GetDebugInfo()));
129
130         m_Instance = this;
131
132         String defaultCertPath = GetDefaultCertPath();
133         String defaultKeyPath = GetDefaultKeyPath();
134         String defaultCaPath = GetDefaultCaPath();
135
136         /* Migrate certificate location < 2.8 to the new default path. */
137         String oldCertPath = GetCertPath();
138         String oldKeyPath = GetKeyPath();
139         String oldCaPath = GetCaPath();
140
141         CopyCertificateFile(oldCertPath, defaultCertPath);
142         CopyCertificateFile(oldKeyPath, defaultKeyPath);
143         CopyCertificateFile(oldCaPath, defaultCaPath);
144
145         if (!oldCertPath.IsEmpty() && !oldKeyPath.IsEmpty() && !oldCaPath.IsEmpty()) {
146                 Log(LogWarning, "ApiListener", "Please read the upgrading documentation for v2.8: https://icinga.com/docs/icinga2/latest/doc/16-upgrading-icinga-2/");
147         }
148
149         /* Create the internal API object storage. */
150         ConfigObjectUtility::CreateStorage();
151
152         /* Cache API packages and their active stage name. */
153         UpdateActivePackageStagesCache();
154
155         /* set up SSL context */
156         std::shared_ptr<X509> cert;
157         try {
158                 cert = GetX509Certificate(defaultCertPath);
159         } catch (const std::exception&) {
160                 BOOST_THROW_EXCEPTION(ScriptError("Cannot get certificate from cert path: '"
161                         + defaultCertPath + "'.", GetDebugInfo()));
162         }
163
164         try {
165                 SetIdentity(GetCertificateCN(cert));
166         } catch (const std::exception&) {
167                 BOOST_THROW_EXCEPTION(ScriptError("Cannot get certificate common name from cert path: '"
168                         + defaultCertPath + "'.", GetDebugInfo()));
169         }
170
171         Log(LogInformation, "ApiListener")
172                 << "My API identity: " << GetIdentity();
173
174         UpdateSSLContext();
175 }
176
177 void ApiListener::UpdateSSLContext()
178 {
179         namespace ssl = boost::asio::ssl;
180
181         Shared<ssl::context>::Ptr context;
182
183         try {
184                 context = MakeAsioSslContext(GetDefaultCertPath(), GetDefaultKeyPath(), GetDefaultCaPath());
185         } catch (const std::exception&) {
186                 BOOST_THROW_EXCEPTION(ScriptError("Cannot make SSL context for cert path: '"
187                         + GetDefaultCertPath() + "' key path: '" + GetDefaultKeyPath() + "' ca path: '" + GetDefaultCaPath() + "'.", GetDebugInfo()));
188         }
189
190         if (!GetCrlPath().IsEmpty()) {
191                 try {
192                         AddCRLToSSLContext(context, GetCrlPath());
193                 } catch (const std::exception&) {
194                         BOOST_THROW_EXCEPTION(ScriptError("Cannot add certificate revocation list to SSL context for crl path: '"
195                                 + GetCrlPath() + "'.", GetDebugInfo()));
196                 }
197         }
198
199         if (!GetCipherList().IsEmpty()) {
200                 try {
201                         SetCipherListToSSLContext(context, GetCipherList());
202                 } catch (const std::exception&) {
203                         BOOST_THROW_EXCEPTION(ScriptError("Cannot set cipher list to SSL context for cipher list: '"
204                                 + GetCipherList() + "'.", GetDebugInfo()));
205                 }
206         }
207
208         if (!GetTlsProtocolmin().IsEmpty()){
209                 try {
210                         SetTlsProtocolminToSSLContext(context, GetTlsProtocolmin());
211                 } catch (const std::exception&) {
212                         BOOST_THROW_EXCEPTION(ScriptError("Cannot set minimum TLS protocol version to SSL context with tls_protocolmin: '" + GetTlsProtocolmin() + "'.", GetDebugInfo()));
213                 }
214         }
215
216         m_SSLContext = context;
217
218         for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) {
219                 for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
220                         client->Disconnect();
221                 }
222         }
223
224         for (const JsonRpcConnection::Ptr& client : m_AnonymousClients) {
225                 client->Disconnect();
226         }
227 }
228
229 void ApiListener::OnAllConfigLoaded()
230 {
231         m_LocalEndpoint = Endpoint::GetByName(GetIdentity());
232
233         if (!m_LocalEndpoint)
234                 BOOST_THROW_EXCEPTION(ScriptError("Endpoint object for '" + GetIdentity() + "' is missing.", GetDebugInfo()));
235 }
236
237 /**
238  * Starts the component.
239  */
240 void ApiListener::Start(bool runtimeCreated)
241 {
242         Log(LogInformation, "ApiListener")
243                 << "'" << GetName() << "' started.";
244
245         SyncLocalZoneDirs();
246
247         ObjectImpl<ApiListener>::Start(runtimeCreated);
248
249         {
250                 boost::mutex::scoped_lock lock(m_LogLock);
251                 OpenLogFile();
252         }
253
254         /* create the primary JSON-RPC listener */
255         if (!AddListener(GetBindHost(), GetBindPort())) {
256                 Log(LogCritical, "ApiListener")
257                         << "Cannot add listener on host '" << GetBindHost() << "' for port '" << GetBindPort() << "'.";
258                 Application::Exit(EXIT_FAILURE);
259         }
260
261         m_Timer = new Timer();
262         m_Timer->OnTimerExpired.connect(std::bind(&ApiListener::ApiTimerHandler, this));
263         m_Timer->SetInterval(5);
264         m_Timer->Start();
265         m_Timer->Reschedule(0);
266
267         m_ReconnectTimer = new Timer();
268         m_ReconnectTimer->OnTimerExpired.connect(std::bind(&ApiListener::ApiReconnectTimerHandler, this));
269         m_ReconnectTimer->SetInterval(10);
270         m_ReconnectTimer->Start();
271         m_ReconnectTimer->Reschedule(0);
272
273         /* Keep this in relative sync with the cold startup in UpdateObjectAuthority() and the reconnect interval above.
274          * Previous: 60s reconnect, 30s OA, 60s cold startup.
275          * Now: 10s reconnect, 10s OA, 30s cold startup. 
276          */
277         m_AuthorityTimer = new Timer();
278         m_AuthorityTimer->OnTimerExpired.connect(std::bind(&ApiListener::UpdateObjectAuthority));
279         m_AuthorityTimer->SetInterval(10);
280         m_AuthorityTimer->Start();
281
282         m_CleanupCertificateRequestsTimer = new Timer();
283         m_CleanupCertificateRequestsTimer->OnTimerExpired.connect(std::bind(&ApiListener::CleanupCertificateRequestsTimerHandler, this));
284         m_CleanupCertificateRequestsTimer->SetInterval(3600);
285         m_CleanupCertificateRequestsTimer->Start();
286         m_CleanupCertificateRequestsTimer->Reschedule(0);
287
288         m_ApiPackageIntegrityTimer = new Timer();
289         m_ApiPackageIntegrityTimer->OnTimerExpired.connect(std::bind(&ApiListener::CheckApiPackageIntegrity, this));
290         m_ApiPackageIntegrityTimer->SetInterval(300);
291         m_ApiPackageIntegrityTimer->Start();
292
293         OnMasterChanged(true);
294 }
295
296 void ApiListener::Stop(bool runtimeDeleted)
297 {
298         ObjectImpl<ApiListener>::Stop(runtimeDeleted);
299
300         Log(LogInformation, "ApiListener")
301                 << "'" << GetName() << "' stopped.";
302
303         {
304                 boost::mutex::scoped_lock lock(m_LogLock);
305                 CloseLogFile();
306                 RotateLogFile();
307         }
308
309         RemoveStatusFile();
310 }
311
312 ApiListener::Ptr ApiListener::GetInstance()
313 {
314         return m_Instance;
315 }
316
317 Endpoint::Ptr ApiListener::GetMaster() const
318 {
319         Zone::Ptr zone = Zone::GetLocalZone();
320
321         if (!zone)
322                 return nullptr;
323
324         std::vector<String> names;
325
326         for (const Endpoint::Ptr& endpoint : zone->GetEndpoints())
327                 if (endpoint->GetConnected() || endpoint->GetName() == GetIdentity())
328                         names.push_back(endpoint->GetName());
329
330         std::sort(names.begin(), names.end());
331
332         return Endpoint::GetByName(*names.begin());
333 }
334
335 bool ApiListener::IsMaster() const
336 {
337         Endpoint::Ptr master = GetMaster();
338
339         if (!master)
340                 return false;
341
342         return master == GetLocalEndpoint();
343 }
344
345 /**
346  * Creates a new JSON-RPC listener on the specified port.
347  *
348  * @param node The host the listener should be bound to.
349  * @param service The port to listen on.
350  */
351 bool ApiListener::AddListener(const String& node, const String& service)
352 {
353         namespace asio = boost::asio;
354         namespace ip = asio::ip;
355         using ip::tcp;
356
357         ObjectLock olock(this);
358
359         auto sslContext (m_SSLContext);
360
361         if (!sslContext) {
362                 Log(LogCritical, "ApiListener", "SSL context is required for AddListener()");
363                 return false;
364         }
365
366         auto& io (IoEngine::Get().GetIoContext());
367         auto acceptor (Shared<tcp::acceptor>::Make(io));
368
369         try {
370                 tcp::resolver resolver (io);
371                 tcp::resolver::query query (node, service, tcp::resolver::query::passive);
372
373                 auto result (resolver.resolve(query));
374                 auto current (result.begin());
375
376                 for (;;) {
377                         try {
378                                 acceptor->open(current->endpoint().protocol());
379
380                                 {
381                                         auto fd (acceptor->native_handle());
382
383                                         const int optFalse = 0;
384                                         setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, reinterpret_cast<const char *>(&optFalse), sizeof(optFalse));
385
386                                         const int optTrue = 1;
387                                         setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<const char *>(&optTrue), sizeof(optTrue));
388 #ifndef _WIN32
389                                         setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, reinterpret_cast<const char *>(&optTrue), sizeof(optTrue));
390 #endif /* _WIN32 */
391                                 }
392
393                                 acceptor->bind(current->endpoint());
394
395                                 break;
396                         } catch (const std::exception&) {
397                                 if (++current == result.end()) {
398                                         throw;
399                                 }
400
401                                 if (acceptor->is_open()) {
402                                         acceptor->close();
403                                 }
404                         }
405                 }
406         } catch (const std::exception& ex) {
407                 Log(LogCritical, "ApiListener")
408                         << "Cannot bind TCP socket for host '" << node << "' on port '" << service << "': " << ex.what();
409                 return false;
410         }
411
412         acceptor->listen(INT_MAX);
413
414         auto localEndpoint (acceptor->local_endpoint());
415
416         Log(LogInformation, "ApiListener")
417                 << "Started new listener on '[" << localEndpoint.address() << "]:" << localEndpoint.port() << "'";
418
419         IoEngine::SpawnCoroutine(io, [this, acceptor, sslContext](asio::yield_context yc) { ListenerCoroutineProc(yc, acceptor, sslContext); });
420
421         UpdateStatusFile(localEndpoint);
422
423         return true;
424 }
425
426 void ApiListener::ListenerCoroutineProc(boost::asio::yield_context yc, const Shared<boost::asio::ip::tcp::acceptor>::Ptr& server, const Shared<boost::asio::ssl::context>::Ptr& sslContext)
427 {
428         namespace asio = boost::asio;
429
430         auto& io (IoEngine::Get().GetIoContext());
431
432         for (;;) {
433                 try {
434                         auto sslConn (Shared<AsioTlsStream>::Make(io, *sslContext));
435
436                         server->async_accept(sslConn->lowest_layer(), yc);
437
438                         IoEngine::SpawnCoroutine(io, [this, sslConn](asio::yield_context yc) { NewClientHandler(yc, sslConn, String(), RoleServer); });
439                 } catch (const std::exception& ex) {
440                         Log(LogCritical, "ApiListener")
441                                 << "Cannot accept new connection: " << ex.what();
442                 }
443         }
444 }
445
446 /**
447  * Creates a new JSON-RPC client and connects to the specified endpoint.
448  *
449  * @param endpoint The endpoint.
450  */
451 void ApiListener::AddConnection(const Endpoint::Ptr& endpoint)
452 {
453         namespace asio = boost::asio;
454         using asio::ip::tcp;
455
456         auto sslContext (m_SSLContext);
457
458         if (!sslContext) {
459                 Log(LogCritical, "ApiListener", "SSL context is required for AddConnection()");
460                 return;
461         }
462
463         auto& io (IoEngine::Get().GetIoContext());
464
465         IoEngine::SpawnCoroutine(io, [this, endpoint, &io, sslContext](asio::yield_context yc) {
466                 String host = endpoint->GetHost();
467                 String port = endpoint->GetPort();
468
469                 Log(LogInformation, "ApiListener")
470                         << "Reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'";
471
472                 try {
473                         auto sslConn (Shared<AsioTlsStream>::Make(io, *sslContext, endpoint->GetName()));
474
475                         Connect(sslConn->lowest_layer(), host, port, yc);
476
477                         NewClientHandler(yc, sslConn, endpoint->GetName(), RoleClient);
478
479                         endpoint->SetConnecting(false);
480                         Log(LogInformation, "ApiListener")
481                                 << "Finished reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'";
482                 } catch (const std::exception& ex) {
483                         endpoint->SetConnecting(false);
484
485                         Log(LogCritical, "ApiListener")
486                                 << "Cannot connect to host '" << host << "' on port '" << port << "': " << ex.what();
487                 }
488         });
489 }
490
491 void ApiListener::NewClientHandler(boost::asio::yield_context yc, const Shared<AsioTlsStream>::Ptr& client, const String& hostname, ConnectionRole role)
492 {
493         try {
494                 NewClientHandlerInternal(yc, client, hostname, role);
495         } catch (const std::exception& ex) {
496                 Log(LogCritical, "ApiListener")
497                         << "Exception while handling new API client connection: " << DiagnosticInformation(ex, false);
498
499                 Log(LogDebug, "ApiListener")
500                         << "Exception while handling new API client connection: " << DiagnosticInformation(ex);
501         }
502 }
503
504 /**
505  * Processes a new client connection.
506  *
507  * @param client The new client.
508  */
509 void ApiListener::NewClientHandlerInternal(boost::asio::yield_context yc, const Shared<AsioTlsStream>::Ptr& client, const String& hostname, ConnectionRole role)
510 {
511         namespace asio = boost::asio;
512         namespace ssl = asio::ssl;
513
514         String conninfo;
515
516         {
517                 std::ostringstream conninfo_;
518
519                 if (role == RoleClient) {
520                         conninfo_ << "to";
521                 } else {
522                         conninfo_ << "from";
523                 }
524
525                 auto endpoint (client->lowest_layer().remote_endpoint());
526
527                 conninfo_ << " [" << endpoint.address() << "]:" << endpoint.port();
528
529                 conninfo = conninfo_.str();
530         }
531
532         auto& sslConn (client->next_layer());
533
534         boost::system::error_code ec;
535
536         sslConn.async_handshake(role == RoleClient ? sslConn.client : sslConn.server, yc[ec]);
537
538         if (ec) {
539                 // https://github.com/boostorg/beast/issues/915
540                 // Google Chrome 73+ seems not close the connection properly, https://stackoverflow.com/questions/56272906/how-to-fix-certificate-unknown-error-from-chrome-v73
541                 if (ec == asio::ssl::error::stream_truncated) {
542                         Log(LogNotice, "ApiListener")
543                                 << "TLS stream was truncated, ignoring connection from " << conninfo;
544                         return;
545                 }
546
547                 Log(LogCritical, "ApiListener")
548                         << "Client TLS handshake failed (" << conninfo << "): " << ec.message();
549                 return;
550         }
551
552         bool willBeShutDown = false;
553
554         Defer shutDownIfNeeded ([&sslConn, &willBeShutDown, &yc]() {
555                 if (!willBeShutDown) {
556                         // Ignore the error, but do not throw an exception being swallowed at all cost.
557                         // https://github.com/Icinga/icinga2/issues/7351
558                         boost::system::error_code ec;
559                         sslConn.async_shutdown(yc[ec]);
560                 }
561         });
562
563         std::shared_ptr<X509> cert (sslConn.GetPeerCertificate());
564         bool verify_ok = false;
565         String identity;
566         Endpoint::Ptr endpoint;
567
568         if (cert) {
569                 verify_ok = sslConn.IsVerifyOK();
570
571                 String verifyError = sslConn.GetVerifyError();
572
573                 try {
574                         identity = GetCertificateCN(cert);
575                 } catch (const std::exception&) {
576                         Log(LogCritical, "ApiListener")
577                                 << "Cannot get certificate common name from cert path: '" << GetDefaultCertPath() << "'.";
578                         return;
579                 }
580
581                 if (!hostname.IsEmpty()) {
582                         if (identity != hostname) {
583                                 Log(LogWarning, "ApiListener")
584                                         << "Unexpected certificate common name while connecting to endpoint '"
585                                         << hostname << "': got '" << identity << "'";
586                                 return;
587                         } else if (!verify_ok) {
588                                 Log(LogWarning, "ApiListener")
589                                         << "Certificate validation failed for endpoint '" << hostname
590                                         << "': " << verifyError;
591                         }
592                 }
593
594                 if (verify_ok) {
595                         endpoint = Endpoint::GetByName(identity);
596                 }
597
598                 Log log(LogInformation, "ApiListener");
599
600                 log << "New client connection for identity '" << identity << "' " << conninfo;
601
602                 if (!verify_ok) {
603                         log << " (certificate validation failed: " << verifyError << ")";
604                 } else if (!endpoint) {
605                         log << " (no Endpoint object found for identity)";
606                 }
607         } else {
608                 Log(LogInformation, "ApiListener")
609                         << "New client connection " << conninfo << " (no client certificate)";
610         }
611
612         ClientType ctype;
613
614         if (role == RoleClient) {
615                 JsonRpc::SendMessage(client, new Dictionary({
616                         { "jsonrpc", "2.0" },
617                         { "method", "icinga::Hello" },
618                         { "params", new Dictionary() }
619                 }), yc);
620
621                 client->async_flush(yc);
622
623                 ctype = ClientJsonRpc;
624         } else {
625                 {
626                         boost::system::error_code ec;
627
628                         if (client->async_fill(yc[ec]) == 0u) {
629                                 if (identity.IsEmpty()) {
630                                         Log(LogInformation, "ApiListener")
631                                                 << "No data received on new API connection " << conninfo << ". "
632                                                 << "Ensure that the remote endpoints are properly configured in a cluster setup.";
633                                 } else {
634                                         Log(LogWarning, "ApiListener")
635                                                 << "No data received on new API connection " << conninfo << " for identity '" << identity << "'. "
636                                                 << "Ensure that the remote endpoints are properly configured in a cluster setup.";
637                                 }
638
639                                 return;
640                         }
641                 }
642
643                 char firstByte = 0;
644
645                 {
646                         asio::mutable_buffer firstByteBuf (&firstByte, 1);
647                         client->peek(firstByteBuf);
648                 }
649
650                 if (firstByte >= '0' && firstByte <= '9') {
651                         ctype = ClientJsonRpc;
652                 } else {
653                         ctype = ClientHttp;
654                 }
655         }
656
657         if (ctype == ClientJsonRpc) {
658                 Log(LogNotice, "ApiListener", "New JSON-RPC client");
659
660                 JsonRpcConnection::Ptr aclient = new JsonRpcConnection(identity, verify_ok, client, role);
661
662                 if (endpoint) {
663                         bool needSync = !endpoint->GetConnected();
664
665                         endpoint->AddClient(aclient);
666
667                         IoEngine::SpawnCoroutine(IoEngine::Get().GetIoContext(), [this, aclient, endpoint, needSync](asio::yield_context yc) {
668                                 CpuBoundWork syncClient (yc);
669
670                                 SyncClient(aclient, endpoint, needSync);
671                         });
672
673                 } else if (!AddAnonymousClient(aclient)) {
674                         Log(LogNotice, "ApiListener")
675                                 << "Ignoring anonymous JSON-RPC connection " << conninfo
676                                 << ". Max connections (" << GetMaxAnonymousClients() << ") exceeded.";
677
678                         aclient = nullptr;
679                 }
680
681                 if (aclient) {
682                         aclient->Start();
683
684                         willBeShutDown = true;
685                 }
686         } else {
687                 Log(LogNotice, "ApiListener", "New HTTP client");
688
689                 HttpServerConnection::Ptr aclient = new HttpServerConnection(identity, verify_ok, client);
690                 AddHttpClient(aclient);
691                 aclient->Start();
692
693                 willBeShutDown = true;
694         }
695 }
696
697 void ApiListener::SyncClient(const JsonRpcConnection::Ptr& aclient, const Endpoint::Ptr& endpoint, bool needSync)
698 {
699         Zone::Ptr eZone = endpoint->GetZone();
700
701         try {
702                 {
703                         ObjectLock olock(endpoint);
704
705                         endpoint->SetSyncing(true);
706                 }
707
708                 Zone::Ptr myZone = Zone::GetLocalZone();
709
710                 if (myZone->GetParent() == eZone) {
711                         Log(LogInformation, "ApiListener")
712                                 << "Requesting new certificate for this Icinga instance from endpoint '" << endpoint->GetName() << "'.";
713
714                         JsonRpcConnection::SendCertificateRequest(aclient, nullptr, String());
715
716                         if (Utility::PathExists(ApiListener::GetCertificateRequestsDir()))
717                                 Utility::Glob(ApiListener::GetCertificateRequestsDir() + "/*.json", std::bind(&JsonRpcConnection::SendCertificateRequest, aclient, nullptr, _1), GlobFile);
718                 }
719
720                 /* Make sure that the config updates are synced
721                  * before the logs are replayed.
722                  */
723
724                 Log(LogInformation, "ApiListener")
725                         << "Sending config updates for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
726
727                 /* sync zone file config */
728                 SendConfigUpdate(aclient);
729
730                 Log(LogInformation, "ApiListener")
731                         << "Finished sending config file updates for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
732
733                 /* sync runtime config */
734                 SendRuntimeConfigObjects(aclient);
735
736                 Log(LogInformation, "ApiListener")
737                         << "Finished sending runtime config updates for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
738
739                 if (!needSync) {
740                         ObjectLock olock2(endpoint);
741                         endpoint->SetSyncing(false);
742                         return;
743                 }
744
745                 Log(LogInformation, "ApiListener")
746                         << "Sending replay log for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
747
748                 ReplayLog(aclient);
749
750                 if (eZone == Zone::GetLocalZone())
751                         UpdateObjectAuthority();
752
753                 Log(LogInformation, "ApiListener")
754                         << "Finished sending replay log for endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
755         } catch (const std::exception& ex) {
756                 {
757                         ObjectLock olock2(endpoint);
758                         endpoint->SetSyncing(false);
759                 }
760
761                 Log(LogCritical, "ApiListener")
762                         << "Error while syncing endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex, false);
763
764                 Log(LogDebug, "ApiListener")
765                         << "Error while syncing endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex);
766         }
767
768         Log(LogInformation, "ApiListener")
769                 << "Finished syncing endpoint '" << endpoint->GetName() << "' in zone '" << eZone->GetName() << "'.";
770 }
771
772 void ApiListener::ApiTimerHandler()
773 {
774         double now = Utility::GetTime();
775
776         std::vector<int> files;
777         Utility::Glob(GetApiDir() + "log/*", std::bind(&ApiListener::LogGlobHandler, std::ref(files), _1), GlobFile);
778         std::sort(files.begin(), files.end());
779
780         for (int ts : files) {
781                 bool need = false;
782                 auto localZone (GetLocalEndpoint()->GetZone());
783
784                 for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) {
785                         if (endpoint == GetLocalEndpoint())
786                                 continue;
787
788                         auto zone (endpoint->GetZone());
789
790                         /* only care for endpoints in a) the same zone b) our parent zone c) immediate child zones */
791                         if (!(zone == localZone || zone == localZone->GetParent() || zone->GetParent() == localZone)) {
792                                 continue;
793                         }
794
795                         if (endpoint->GetLogDuration() >= 0 && ts < now - endpoint->GetLogDuration())
796                                 continue;
797
798                         if (ts > endpoint->GetLocalLogPosition()) {
799                                 need = true;
800                                 break;
801                         }
802                 }
803
804                 if (!need) {
805                         String path = GetApiDir() + "log/" + Convert::ToString(ts);
806                         Log(LogNotice, "ApiListener")
807                                 << "Removing old log file: " << path;
808                         (void)unlink(path.CStr());
809                 }
810         }
811
812         for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) {
813                 if (!endpoint->GetConnected())
814                         continue;
815
816                 double ts = endpoint->GetRemoteLogPosition();
817
818                 if (ts == 0)
819                         continue;
820
821                 Dictionary::Ptr lmessage = new Dictionary({
822                         { "jsonrpc", "2.0" },
823                         { "method", "log::SetLogPosition" },
824                         { "params", new Dictionary({
825                                 { "log_position", ts }
826                         }) }
827                 });
828
829                 double maxTs = 0;
830
831                 for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
832                         if (client->GetTimestamp() > maxTs)
833                                 maxTs = client->GetTimestamp();
834                 }
835
836                 for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
837                         if (client->GetTimestamp() == maxTs) {
838                                 client->SendMessage(lmessage);
839                         } else {
840                                 client->Disconnect();
841                         }
842                 }
843
844                 Log(LogNotice, "ApiListener")
845                         << "Setting log position for identity '" << endpoint->GetName() << "': "
846                         << Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", ts);
847         }
848 }
849
850 void ApiListener::ApiReconnectTimerHandler()
851 {
852         Zone::Ptr my_zone = Zone::GetLocalZone();
853
854         for (const Zone::Ptr& zone : ConfigType::GetObjectsByType<Zone>()) {
855                 /* don't connect to global zones */
856                 if (zone->GetGlobal())
857                         continue;
858
859                 /* only connect to endpoints in a) the same zone b) our parent zone c) immediate child zones */
860                 if (my_zone != zone && my_zone != zone->GetParent() && zone != my_zone->GetParent()) {
861                         Log(LogDebug, "ApiListener")
862                                 << "Not connecting to Zone '" << zone->GetName()
863                                 << "' because it's not in the same zone, a parent or a child zone.";
864                         continue;
865                 }
866
867                 for (const Endpoint::Ptr& endpoint : zone->GetEndpoints()) {
868                         /* don't connect to ourselves */
869                         if (endpoint == GetLocalEndpoint()) {
870                                 Log(LogDebug, "ApiListener")
871                                         << "Not connecting to Endpoint '" << endpoint->GetName() << "' because that's us.";
872                                 continue;
873                         }
874
875                         /* don't try to connect to endpoints which don't have a host and port */
876                         if (endpoint->GetHost().IsEmpty() || endpoint->GetPort().IsEmpty()) {
877                                 Log(LogDebug, "ApiListener")
878                                         << "Not connecting to Endpoint '" << endpoint->GetName()
879                                         << "' because the host/port attributes are missing.";
880                                 continue;
881                         }
882
883                         /* don't try to connect if there's already a connection attempt */
884                         if (endpoint->GetConnecting()) {
885                                 Log(LogDebug, "ApiListener")
886                                         << "Not connecting to Endpoint '" << endpoint->GetName()
887                                         << "' because we're already trying to connect to it.";
888                                 continue;
889                         }
890
891                         /* don't try to connect if we're already connected */
892                         if (endpoint->GetConnected()) {
893                                 Log(LogDebug, "ApiListener")
894                                         << "Not connecting to Endpoint '" << endpoint->GetName()
895                                         << "' because we're already connected to it.";
896                                 continue;
897                         }
898
899                         /* Set connecting state to prevent duplicated queue inserts later. */
900                         endpoint->SetConnecting(true);
901
902                         AddConnection(endpoint);
903                 }
904         }
905
906         Endpoint::Ptr master = GetMaster();
907
908         if (master)
909                 Log(LogNotice, "ApiListener")
910                         << "Current zone master: " << master->GetName();
911
912         std::vector<String> names;
913         for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>())
914                 if (endpoint->GetConnected())
915                         names.emplace_back(endpoint->GetName() + " (" + Convert::ToString(endpoint->GetClients().size()) + ")");
916
917         Log(LogNotice, "ApiListener")
918                 << "Connected endpoints: " << Utility::NaturalJoin(names);
919 }
920
921 static void CleanupCertificateRequest(const String& path, double expiryTime)
922 {
923 #ifndef _WIN32
924         struct stat statbuf;
925         if (lstat(path.CStr(), &statbuf) < 0)
926                 return;
927 #else /* _WIN32 */
928         struct _stat statbuf;
929         if (_stat(path.CStr(), &statbuf) < 0)
930                 return;
931 #endif /* _WIN32 */
932
933         if (statbuf.st_mtime < expiryTime)
934                 (void) unlink(path.CStr());
935 }
936
937 void ApiListener::CleanupCertificateRequestsTimerHandler()
938 {
939         String requestsDir = GetCertificateRequestsDir();
940
941         if (Utility::PathExists(requestsDir)) {
942                 /* remove certificate requests that are older than a week */
943                 double expiryTime = Utility::GetTime() - 7 * 24 * 60 * 60;
944                 Utility::Glob(requestsDir + "/*.json", std::bind(&CleanupCertificateRequest, _1, expiryTime), GlobFile);
945         }
946 }
947
948 void ApiListener::RelayMessage(const MessageOrigin::Ptr& origin,
949         const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log)
950 {
951         if (!IsActive())
952                 return;
953
954         m_RelayQueue.Enqueue(std::bind(&ApiListener::SyncRelayMessage, this, origin, secobj, message, log), PriorityNormal, true);
955 }
956
957 void ApiListener::PersistMessage(const Dictionary::Ptr& message, const ConfigObject::Ptr& secobj)
958 {
959         double ts = message->Get("ts");
960
961         ASSERT(ts != 0);
962
963         Dictionary::Ptr pmessage = new Dictionary();
964         pmessage->Set("timestamp", ts);
965
966         pmessage->Set("message", JsonEncode(message));
967
968         if (secobj) {
969                 Dictionary::Ptr secname = new Dictionary();
970                 secname->Set("type", secobj->GetReflectionType()->GetName());
971                 secname->Set("name", secobj->GetName());
972                 pmessage->Set("secobj", secname);
973         }
974
975         boost::mutex::scoped_lock lock(m_LogLock);
976         if (m_LogFile) {
977                 NetString::WriteStringToStream(m_LogFile, JsonEncode(pmessage));
978                 m_LogMessageCount++;
979                 SetLogMessageTimestamp(ts);
980
981                 if (m_LogMessageCount > 50000) {
982                         CloseLogFile();
983                         RotateLogFile();
984                         OpenLogFile();
985                 }
986         }
987 }
988
989 void ApiListener::SyncSendMessage(const Endpoint::Ptr& endpoint, const Dictionary::Ptr& message)
990 {
991         ObjectLock olock(endpoint);
992
993         if (!endpoint->GetSyncing()) {
994                 Log(LogNotice, "ApiListener")
995                         << "Sending message '" << message->Get("method") << "' to '" << endpoint->GetName() << "'";
996
997                 double maxTs = 0;
998
999                 for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
1000                         if (client->GetTimestamp() > maxTs)
1001                                 maxTs = client->GetTimestamp();
1002                 }
1003
1004                 for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
1005                         if (client->GetTimestamp() != maxTs)
1006                                 continue;
1007
1008                         client->SendMessage(message);
1009                 }
1010         }
1011 }
1012
1013 bool ApiListener::RelayMessageOne(const Zone::Ptr& targetZone, const MessageOrigin::Ptr& origin, const Dictionary::Ptr& message, const Endpoint::Ptr& currentZoneMaster)
1014 {
1015         ASSERT(targetZone);
1016
1017         Zone::Ptr localZone = Zone::GetLocalZone();
1018
1019         /* only relay the message to a) the same local zone, b) the parent zone and c) direct child zones. Exception is a global zone. */
1020         if (!targetZone->GetGlobal() &&
1021                 targetZone != localZone &&
1022                 targetZone != localZone->GetParent() &&
1023                 targetZone->GetParent() != localZone) {
1024                 return true;
1025         }
1026
1027         Endpoint::Ptr localEndpoint = GetLocalEndpoint();
1028
1029         std::vector<Endpoint::Ptr> skippedEndpoints;
1030
1031         bool relayed = false, log_needed = false, log_done = false;
1032
1033         std::set<Endpoint::Ptr> targetEndpoints;
1034
1035         if (targetZone->GetGlobal()) {
1036                 targetEndpoints = localZone->GetEndpoints();
1037
1038                 for (const Zone::Ptr& zone : ConfigType::GetObjectsByType<Zone>()) {
1039                         /* Fetch immediate child zone members */
1040                         if (zone->GetParent() == localZone) {
1041                                 std::set<Endpoint::Ptr> endpoints = zone->GetEndpoints();
1042                                 targetEndpoints.insert(endpoints.begin(), endpoints.end());
1043                         }
1044                 }
1045         } else {
1046                 targetEndpoints = targetZone->GetEndpoints();
1047         }
1048
1049         for (const Endpoint::Ptr& targetEndpoint : targetEndpoints) {
1050                 /* Don't relay messages to ourselves. */
1051                 if (targetEndpoint == localEndpoint)
1052                         continue;
1053
1054                 log_needed = true;
1055
1056                 /* Don't relay messages to disconnected endpoints. */
1057                 if (!targetEndpoint->GetConnected()) {
1058                         if (targetZone == localZone)
1059                                 log_done = false;
1060
1061                         continue;
1062                 }
1063
1064                 log_done = true;
1065
1066                 /* Don't relay the message to the zone through more than one endpoint unless this is our own zone.
1067                  * 'relayed' is set to true on success below, enabling the checks in the second iteration.
1068                  */
1069                 if (relayed && targetZone != localZone) {
1070                         skippedEndpoints.push_back(targetEndpoint);
1071                         continue;
1072                 }
1073
1074                 /* Don't relay messages back to the endpoint which we got the message from. */
1075                 if (origin && origin->FromClient && targetEndpoint == origin->FromClient->GetEndpoint()) {
1076                         skippedEndpoints.push_back(targetEndpoint);
1077                         continue;
1078                 }
1079
1080                 /* Don't relay messages back to the zone which we got the message from. */
1081                 if (origin && origin->FromZone && targetZone == origin->FromZone) {
1082                         skippedEndpoints.push_back(targetEndpoint);
1083                         continue;
1084                 }
1085
1086                 /* Only relay message to the zone master if we're not currently the zone master.
1087                  * e1 is zone master, e2 and e3 are zone members.
1088                  *
1089                  * Message is sent from e2 or e3:
1090                  *   !isMaster == true
1091                  *   targetEndpoint e1 is zone master -> send the message
1092                  *   targetEndpoint e3 is not zone master -> skip it, avoid routing loops
1093                  *
1094                  * Message is sent from e1:
1095                  *   !isMaster == false -> send the messages to e2 and e3 being the zone routing master.
1096                  */
1097                 bool isMaster = (currentZoneMaster == localEndpoint);
1098
1099                 if (!isMaster && targetEndpoint != currentZoneMaster) {
1100                         skippedEndpoints.push_back(targetEndpoint);
1101                         continue;
1102                 }
1103
1104                 relayed = true;
1105
1106                 SyncSendMessage(targetEndpoint, message);
1107         }
1108
1109         if (!skippedEndpoints.empty()) {
1110                 double ts = message->Get("ts");
1111
1112                 for (const Endpoint::Ptr& skippedEndpoint : skippedEndpoints)
1113                         skippedEndpoint->SetLocalLogPosition(ts);
1114         }
1115
1116         return !log_needed || log_done;
1117 }
1118
1119 void ApiListener::SyncRelayMessage(const MessageOrigin::Ptr& origin,
1120         const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log)
1121 {
1122         double ts = Utility::GetTime();
1123         message->Set("ts", ts);
1124
1125         Log(LogNotice, "ApiListener")
1126                 << "Relaying '" << message->Get("method") << "' message";
1127
1128         if (origin && origin->FromZone)
1129                 message->Set("originZone", origin->FromZone->GetName());
1130
1131         Zone::Ptr target_zone;
1132
1133         if (secobj) {
1134                 if (secobj->GetReflectionType() == Zone::TypeInstance)
1135                         target_zone = static_pointer_cast<Zone>(secobj);
1136                 else
1137                         target_zone = static_pointer_cast<Zone>(secobj->GetZone());
1138         }
1139
1140         if (!target_zone)
1141                 target_zone = Zone::GetLocalZone();
1142
1143         Endpoint::Ptr master = GetMaster();
1144
1145         bool need_log = !RelayMessageOne(target_zone, origin, message, master);
1146
1147         for (const Zone::Ptr& zone : target_zone->GetAllParentsRaw()) {
1148                 if (!RelayMessageOne(zone, origin, message, master))
1149                         need_log = true;
1150         }
1151
1152         if (log && need_log)
1153                 PersistMessage(message, secobj);
1154 }
1155
1156 /* must hold m_LogLock */
1157 void ApiListener::OpenLogFile()
1158 {
1159         String path = GetApiDir() + "log/current";
1160
1161         Utility::MkDirP(Utility::DirName(path), 0750);
1162
1163         auto *fp = new std::fstream(path.CStr(), std::fstream::out | std::ofstream::app);
1164
1165         if (!fp->good()) {
1166                 Log(LogWarning, "ApiListener")
1167                         << "Could not open spool file: " << path;
1168                 return;
1169         }
1170
1171         m_LogFile = new StdioStream(fp, true);
1172         m_LogMessageCount = 0;
1173         SetLogMessageTimestamp(Utility::GetTime());
1174 }
1175
1176 /* must hold m_LogLock */
1177 void ApiListener::CloseLogFile()
1178 {
1179         if (!m_LogFile)
1180                 return;
1181
1182         m_LogFile->Close();
1183         m_LogFile.reset();
1184 }
1185
1186 /* must hold m_LogLock */
1187 void ApiListener::RotateLogFile()
1188 {
1189         double ts = GetLogMessageTimestamp();
1190
1191         if (ts == 0)
1192                 ts = Utility::GetTime();
1193
1194         String oldpath = GetApiDir() + "log/current";
1195         String newpath = GetApiDir() + "log/" + Convert::ToString(static_cast<int>(ts)+1);
1196
1197         // If the log is being rotated more than once per second,
1198         // don't overwrite the previous one, but silently deny rotation.
1199         if (!Utility::PathExists(newpath)) {
1200                 (void) rename(oldpath.CStr(), newpath.CStr());
1201         }
1202 }
1203
1204 void ApiListener::LogGlobHandler(std::vector<int>& files, const String& file)
1205 {
1206         String name = Utility::BaseName(file);
1207
1208         if (name == "current")
1209                 return;
1210
1211         int ts;
1212
1213         try {
1214                 ts = Convert::ToLong(name);
1215         } catch (const std::exception&) {
1216                 return;
1217         }
1218
1219         files.push_back(ts);
1220 }
1221
1222 void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)
1223 {
1224         Endpoint::Ptr endpoint = client->GetEndpoint();
1225
1226         if (endpoint->GetLogDuration() == 0) {
1227                 ObjectLock olock2(endpoint);
1228                 endpoint->SetSyncing(false);
1229                 return;
1230         }
1231
1232         CONTEXT("Replaying log for Endpoint '" + endpoint->GetName() + "'");
1233
1234         int count = -1;
1235         double peer_ts = endpoint->GetLocalLogPosition();
1236         double logpos_ts = peer_ts;
1237         bool last_sync = false;
1238
1239         Endpoint::Ptr target_endpoint = client->GetEndpoint();
1240         ASSERT(target_endpoint);
1241
1242         Zone::Ptr target_zone = target_endpoint->GetZone();
1243
1244         if (!target_zone) {
1245                 ObjectLock olock2(endpoint);
1246                 endpoint->SetSyncing(false);
1247                 return;
1248         }
1249
1250         for (;;) {
1251                 boost::mutex::scoped_lock lock(m_LogLock);
1252
1253                 CloseLogFile();
1254
1255                 if (count == -1 || count > 50000) {
1256                         OpenLogFile();
1257                         lock.unlock();
1258                 } else {
1259                         last_sync = true;
1260                 }
1261
1262                 count = 0;
1263
1264                 std::vector<int> files;
1265                 Utility::Glob(GetApiDir() + "log/*", std::bind(&ApiListener::LogGlobHandler, std::ref(files), _1), GlobFile);
1266                 std::sort(files.begin(), files.end());
1267
1268                 std::vector<std::pair<int, String>> allFiles;
1269
1270                 for (int ts : files) {
1271                         if (ts >= peer_ts) {
1272                                 allFiles.emplace_back(ts, GetApiDir() + "log/" + Convert::ToString(ts));
1273                         }
1274                 }
1275
1276                 allFiles.emplace_back(Utility::GetTime() + 1, GetApiDir() + "log/current");
1277
1278                 for (auto& file : allFiles) {
1279                         Log(LogNotice, "ApiListener")
1280                                 << "Replaying log: " << file.second;
1281
1282                         auto *fp = new std::fstream(file.second.CStr(), std::fstream::in | std::fstream::binary);
1283                         StdioStream::Ptr logStream = new StdioStream(fp, true);
1284
1285                         String message;
1286                         StreamReadContext src;
1287                         while (true) {
1288                                 Dictionary::Ptr pmessage;
1289
1290                                 try {
1291                                         StreamReadStatus srs = NetString::ReadStringFromStream(logStream, &message, src);
1292
1293                                         if (srs == StatusEof)
1294                                                 break;
1295
1296                                         if (srs != StatusNewItem)
1297                                                 continue;
1298
1299                                         pmessage = JsonDecode(message);
1300                                 } catch (const std::exception&) {
1301                                         Log(LogWarning, "ApiListener")
1302                                                 << "Unexpected end-of-file for cluster log: " << file.second;
1303
1304                                         /* Log files may be incomplete or corrupted. This is perfectly OK. */
1305                                         break;
1306                                 }
1307
1308                                 if (pmessage->Get("timestamp") <= peer_ts)
1309                                         continue;
1310
1311                                 Dictionary::Ptr secname = pmessage->Get("secobj");
1312
1313                                 if (secname) {
1314                                         ConfigObject::Ptr secobj = ConfigObject::GetObject(secname->Get("type"), secname->Get("name"));
1315
1316                                         if (!secobj)
1317                                                 continue;
1318
1319                                         if (!target_zone->CanAccessObject(secobj))
1320                                                 continue;
1321                                 }
1322
1323                                 try  {
1324                                         client->SendRawMessage(pmessage->Get("message"));
1325                                         count++;
1326                                 } catch (const std::exception& ex) {
1327                                         Log(LogWarning, "ApiListener")
1328                                                 << "Error while replaying log for endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex, false);
1329
1330                                         Log(LogDebug, "ApiListener")
1331                                                 << "Error while replaying log for endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex);
1332
1333                                         break;
1334                                 }
1335
1336                                 peer_ts = pmessage->Get("timestamp");
1337
1338                                 if (file.first > logpos_ts + 10) {
1339                                         logpos_ts = file.first;
1340
1341                                         Dictionary::Ptr lmessage = new Dictionary({
1342                                                 { "jsonrpc", "2.0" },
1343                                                 { "method", "log::SetLogPosition" },
1344                                                 { "params", new Dictionary({
1345                                                         { "log_position", logpos_ts }
1346                                                 }) }
1347                                         });
1348
1349                                         client->SendMessage(lmessage);
1350                                 }
1351                         }
1352
1353                         logStream->Close();
1354                 }
1355
1356                 if (count > 0) {
1357                         Log(LogInformation, "ApiListener")
1358                                 << "Replayed " << count << " messages.";
1359                 }
1360                 else {
1361                         Log(LogNotice, "ApiListener")
1362                                 << "Replayed " << count << " messages.";
1363                 }
1364
1365                 if (last_sync) {
1366                         {
1367                                 ObjectLock olock2(endpoint);
1368                                 endpoint->SetSyncing(false);
1369                         }
1370
1371                         OpenLogFile();
1372
1373                         break;
1374                 }
1375         }
1376 }
1377
1378 void ApiListener::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
1379 {
1380         std::pair<Dictionary::Ptr, Dictionary::Ptr> stats;
1381
1382         ApiListener::Ptr listener = ApiListener::GetInstance();
1383
1384         if (!listener)
1385                 return;
1386
1387         stats = listener->GetStatus();
1388
1389         ObjectLock olock(stats.second);
1390         for (const Dictionary::Pair& kv : stats.second)
1391                 perfdata->Add(new PerfdataValue("api_" + kv.first, kv.second));
1392
1393         status->Set("api", stats.first);
1394 }
1395
1396 std::pair<Dictionary::Ptr, Dictionary::Ptr> ApiListener::GetStatus()
1397 {
1398         Dictionary::Ptr perfdata = new Dictionary();
1399
1400         /* cluster stats */
1401
1402         double allEndpoints = 0;
1403         Array::Ptr allNotConnectedEndpoints = new Array();
1404         Array::Ptr allConnectedEndpoints = new Array();
1405
1406         Zone::Ptr my_zone = Zone::GetLocalZone();
1407
1408         Dictionary::Ptr connectedZones = new Dictionary();
1409
1410         for (const Zone::Ptr& zone : ConfigType::GetObjectsByType<Zone>()) {
1411                 /* only check endpoints in a) the same zone b) our parent zone c) immediate child zones */
1412                 if (my_zone != zone && my_zone != zone->GetParent() && zone != my_zone->GetParent()) {
1413                         Log(LogDebug, "ApiListener")
1414                                 << "Not checking connection to Zone '" << zone->GetName() << "' because it's not in the same zone, a parent or a child zone.";
1415                         continue;
1416                 }
1417
1418                 bool zoneConnected = false;
1419                 int countZoneEndpoints = 0;
1420                 double zoneLag = 0;
1421
1422                 ArrayData zoneEndpoints;
1423
1424                 for (const Endpoint::Ptr& endpoint : zone->GetEndpoints()) {
1425                         zoneEndpoints.emplace_back(endpoint->GetName());
1426
1427                         if (endpoint->GetName() == GetIdentity())
1428                                 continue;
1429
1430                         double eplag = CalculateZoneLag(endpoint);
1431
1432                         if (eplag > 0 && eplag > zoneLag)
1433                                 zoneLag = eplag;
1434
1435                         allEndpoints++;
1436                         countZoneEndpoints++;
1437
1438                         if (!endpoint->GetConnected()) {
1439                                 allNotConnectedEndpoints->Add(endpoint->GetName());
1440                         } else {
1441                                 allConnectedEndpoints->Add(endpoint->GetName());
1442                                 zoneConnected = true;
1443                         }
1444                 }
1445
1446                 /* if there's only one endpoint inside the zone, we're not connected - that's us, fake it */
1447                 if (zone->GetEndpoints().size() == 1 && countZoneEndpoints == 0)
1448                         zoneConnected = true;
1449
1450                 String parentZoneName;
1451                 Zone::Ptr parentZone = zone->GetParent();
1452                 if (parentZone)
1453                         parentZoneName = parentZone->GetName();
1454
1455                 Dictionary::Ptr zoneStats = new Dictionary({
1456                         { "connected", zoneConnected },
1457                         { "client_log_lag", zoneLag },
1458                         { "endpoints", new Array(std::move(zoneEndpoints)) },
1459                         { "parent_zone", parentZoneName }
1460                 });
1461
1462                 connectedZones->Set(zone->GetName(), zoneStats);
1463         }
1464
1465         /* connection stats */
1466         size_t jsonRpcAnonymousClients = GetAnonymousClients().size();
1467         size_t httpClients = GetHttpClients().size();
1468         size_t syncQueueItems = m_SyncQueue.GetLength();
1469         size_t relayQueueItems = m_RelayQueue.GetLength();
1470         double workQueueItemRate = JsonRpcConnection::GetWorkQueueRate();
1471         double syncQueueItemRate = m_SyncQueue.GetTaskCount(60) / 60.0;
1472         double relayQueueItemRate = m_RelayQueue.GetTaskCount(60) / 60.0;
1473
1474         Dictionary::Ptr status = new Dictionary({
1475                 { "identity", GetIdentity() },
1476                 { "num_endpoints", allEndpoints },
1477                 { "num_conn_endpoints", allConnectedEndpoints->GetLength() },
1478                 { "num_not_conn_endpoints", allNotConnectedEndpoints->GetLength() },
1479                 { "conn_endpoints", allConnectedEndpoints },
1480                 { "not_conn_endpoints", allNotConnectedEndpoints },
1481
1482                 { "zones", connectedZones },
1483
1484                 { "json_rpc", new Dictionary({
1485                         { "anonymous_clients", jsonRpcAnonymousClients },
1486                         { "sync_queue_items", syncQueueItems },
1487                         { "relay_queue_items", relayQueueItems },
1488                         { "work_queue_item_rate", workQueueItemRate },
1489                         { "sync_queue_item_rate", syncQueueItemRate },
1490                         { "relay_queue_item_rate", relayQueueItemRate }
1491                 }) },
1492
1493                 { "http", new Dictionary({
1494                         { "clients", httpClients }
1495                 }) }
1496         });
1497
1498         /* performance data */
1499         perfdata->Set("num_endpoints", allEndpoints);
1500         perfdata->Set("num_conn_endpoints", Convert::ToDouble(allConnectedEndpoints->GetLength()));
1501         perfdata->Set("num_not_conn_endpoints", Convert::ToDouble(allNotConnectedEndpoints->GetLength()));
1502
1503         perfdata->Set("num_json_rpc_anonymous_clients", jsonRpcAnonymousClients);
1504         perfdata->Set("num_http_clients", httpClients);
1505         perfdata->Set("num_json_rpc_sync_queue_items", syncQueueItems);
1506         perfdata->Set("num_json_rpc_relay_queue_items", relayQueueItems);
1507
1508         perfdata->Set("num_json_rpc_work_queue_item_rate", workQueueItemRate);
1509         perfdata->Set("num_json_rpc_sync_queue_item_rate", syncQueueItemRate);
1510         perfdata->Set("num_json_rpc_relay_queue_item_rate", relayQueueItemRate);
1511
1512         return std::make_pair(status, perfdata);
1513 }
1514
1515 double ApiListener::CalculateZoneLag(const Endpoint::Ptr& endpoint)
1516 {
1517         double remoteLogPosition = endpoint->GetRemoteLogPosition();
1518         double eplag = Utility::GetTime() - remoteLogPosition;
1519
1520         if ((endpoint->GetSyncing() || !endpoint->GetConnected()) && remoteLogPosition != 0)
1521                 return eplag;
1522
1523         return 0;
1524 }
1525
1526 bool ApiListener::AddAnonymousClient(const JsonRpcConnection::Ptr& aclient)
1527 {
1528         boost::mutex::scoped_lock lock(m_AnonymousClientsLock);
1529
1530         if (GetMaxAnonymousClients() >= 0 && (long)m_AnonymousClients.size() + 1 > (long)GetMaxAnonymousClients())
1531                 return false;
1532
1533         m_AnonymousClients.insert(aclient);
1534         return true;
1535 }
1536
1537 void ApiListener::RemoveAnonymousClient(const JsonRpcConnection::Ptr& aclient)
1538 {
1539         boost::mutex::scoped_lock lock(m_AnonymousClientsLock);
1540         m_AnonymousClients.erase(aclient);
1541 }
1542
1543 std::set<JsonRpcConnection::Ptr> ApiListener::GetAnonymousClients() const
1544 {
1545         boost::mutex::scoped_lock lock(m_AnonymousClientsLock);
1546         return m_AnonymousClients;
1547 }
1548
1549 void ApiListener::AddHttpClient(const HttpServerConnection::Ptr& aclient)
1550 {
1551         boost::mutex::scoped_lock lock(m_HttpClientsLock);
1552         m_HttpClients.insert(aclient);
1553 }
1554
1555 void ApiListener::RemoveHttpClient(const HttpServerConnection::Ptr& aclient)
1556 {
1557         boost::mutex::scoped_lock lock(m_HttpClientsLock);
1558         m_HttpClients.erase(aclient);
1559 }
1560
1561 std::set<HttpServerConnection::Ptr> ApiListener::GetHttpClients() const
1562 {
1563         boost::mutex::scoped_lock lock(m_HttpClientsLock);
1564         return m_HttpClients;
1565 }
1566
1567 Value ApiListener::HelloAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params)
1568 {
1569         return Empty;
1570 }
1571
1572 Endpoint::Ptr ApiListener::GetLocalEndpoint() const
1573 {
1574         return m_LocalEndpoint;
1575 }
1576
1577 void ApiListener::UpdateActivePackageStagesCache()
1578 {
1579         boost::mutex::scoped_lock lock(m_ActivePackageStagesLock);
1580
1581         for (auto package : ConfigPackageUtility::GetPackages()) {
1582                 String activeStage;
1583
1584                 try {
1585                         activeStage = ConfigPackageUtility::GetActiveStageFromFile(package);
1586                 } catch (const std::exception& ex) {
1587                         Log(LogCritical, "ApiListener")
1588                                 << ex.what();
1589                         continue;
1590                 }
1591
1592                 Log(LogNotice, "ApiListener")
1593                         << "Updating cache: Config package '" << package << "' has active stage '" << activeStage << "'.";
1594
1595                 m_ActivePackageStages[package] = activeStage;
1596         }
1597 }
1598
1599 void ApiListener::CheckApiPackageIntegrity()
1600 {
1601         boost::mutex::scoped_lock lock(m_ActivePackageStagesLock);
1602
1603         for (auto package : ConfigPackageUtility::GetPackages()) {
1604                 String activeStage;
1605                 try {
1606                         activeStage = ConfigPackageUtility::GetActiveStageFromFile(package);
1607                 } catch (const std::exception& ex) {
1608                         /* An error means that the stage is broken, try to repair it. */
1609                         auto it = m_ActivePackageStages.find(package);
1610
1611                         if (it == m_ActivePackageStages.end())
1612                                 continue;
1613
1614                         String activeStageCached = it->second;
1615
1616                         Log(LogInformation, "ApiListener")
1617                                 << "Repairing broken API config package '" << package
1618                                 << "', setting active stage '" << activeStageCached << "'.";
1619
1620                         ConfigPackageUtility::SetActiveStageToFile(package, activeStageCached);
1621                 }
1622         }
1623 }
1624
1625 void ApiListener::SetActivePackageStage(const String& package, const String& stage)
1626 {
1627         boost::mutex::scoped_lock lock(m_ActivePackageStagesLock);
1628         m_ActivePackageStages[package] = stage;
1629 }
1630
1631 String ApiListener::GetActivePackageStage(const String& package)
1632 {
1633         boost::mutex::scoped_lock lock(m_ActivePackageStagesLock);
1634
1635         if (m_ActivePackageStages.find(package) == m_ActivePackageStages.end())
1636                 BOOST_THROW_EXCEPTION(ScriptError("Package " + package + " has no active stage."));
1637
1638         return m_ActivePackageStages[package];
1639 }
1640
1641 void ApiListener::RemoveActivePackageStage(const String& package)
1642 {
1643         /* This is the rare occassion when a package has been deleted. */
1644         boost::mutex::scoped_lock lock(m_ActivePackageStagesLock);
1645
1646         auto it = m_ActivePackageStages.find(package);
1647
1648         if (it == m_ActivePackageStages.end())
1649                 return;
1650
1651         m_ActivePackageStages.erase(it);
1652 }
1653
1654 void ApiListener::ValidateTlsProtocolmin(const Lazy<String>& lvalue, const ValidationUtils& utils)
1655 {
1656         ObjectImpl<ApiListener>::ValidateTlsProtocolmin(lvalue, utils);
1657
1658         if (lvalue() != SSL_TXT_TLSV1_2) {
1659                 String message = "Invalid TLS version. Must be '" SSL_TXT_TLSV1_2 "'";
1660
1661                 BOOST_THROW_EXCEPTION(ValidationError(this, { "tls_protocolmin" }, message));
1662         }
1663 }
1664
1665 void ApiListener::ValidateTlsHandshakeTimeout(const Lazy<double>& lvalue, const ValidationUtils& utils)
1666 {
1667         ObjectImpl<ApiListener>::ValidateTlsHandshakeTimeout(lvalue, utils);
1668
1669         if (lvalue() <= 0)
1670                 BOOST_THROW_EXCEPTION(ValidationError(this, { "tls_handshake_timeout" }, "Value must be greater than 0."));
1671 }
1672
1673 bool ApiListener::IsHACluster()
1674 {
1675         Zone::Ptr zone = Zone::GetLocalZone();
1676
1677         if (!zone)
1678                 return false;
1679
1680         return zone->IsSingleInstance();
1681 }
1682
1683 /* Provide a helper function for zone origin name. */
1684 String ApiListener::GetFromZoneName(const Zone::Ptr& fromZone)
1685 {
1686         String fromZoneName;
1687
1688         if (fromZone) {
1689                 fromZoneName = fromZone->GetName();
1690         } else {
1691                 Zone::Ptr lzone = Zone::GetLocalZone();
1692
1693                 if (lzone)
1694                         fromZoneName = lzone->GetName();
1695         }
1696
1697         return fromZoneName;
1698 }
1699
1700 void ApiListener::UpdateStatusFile(boost::asio::ip::tcp::endpoint localEndpoint)
1701 {
1702         String path = Configuration::CacheDir + "/api-state.json";
1703
1704         Utility::SaveJsonFile(path, 0644, new Dictionary({
1705                 {"host", String(localEndpoint.address().to_string())},
1706                 {"port", localEndpoint.port()}
1707         }));
1708 }
1709
1710 void ApiListener::RemoveStatusFile()
1711 {
1712         String path = Configuration::CacheDir + "/api-state.json";
1713
1714         Utility::Remove(path);
1715 }