]> granicus.if.org Git - icinga2/blob - lib/remote/apilistener.cpp
f1288dcafd73bf4c44f3a04557e19f47681ffbc3
[icinga2] / lib / remote / apilistener.cpp
1 /******************************************************************************
2  * Icinga 2                                                                   *
3  * Copyright (C) 2012-2015 Icinga Development Team (http://www.icinga.org)    *
4  *                                                                            *
5  * This program is free software; you can redistribute it and/or              *
6  * modify it under the terms of the GNU General Public License                *
7  * as published by the Free Software Foundation; either version 2             *
8  * of the License, or (at your option) any later version.                     *
9  *                                                                            *
10  * This program is distributed in the hope that it will be useful,            *
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
13  * GNU General Public License for more details.                               *
14  *                                                                            *
15  * You should have received a copy of the GNU General Public License          *
16  * along with this program; if not, write to the Free Software Foundation     *
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
18  ******************************************************************************/
19
20 #include "remote/apilistener.hpp"
21 #include "remote/apilistener.tcpp"
22 #include "remote/jsonrpcconnection.hpp"
23 #include "remote/endpoint.hpp"
24 #include "remote/jsonrpc.hpp"
25 #include "remote/apifunction.hpp"
26 #include "base/convert.hpp"
27 #include "base/netstring.hpp"
28 #include "base/json.hpp"
29 #include "base/configtype.hpp"
30 #include "base/logger.hpp"
31 #include "base/objectlock.hpp"
32 #include "base/stdiostream.hpp"
33 #include "base/application.hpp"
34 #include "base/context.hpp"
35 #include "base/statsfunction.hpp"
36 #include "base/exception.hpp"
37 #include <fstream>
38
39 using namespace icinga;
40
41 REGISTER_TYPE(ApiListener);
42
43 boost::signals2::signal<void(bool)> ApiListener::OnMasterChanged;
44
45 REGISTER_STATSFUNCTION(ApiListener, &ApiListener::StatsFunc);
46
47 REGISTER_APIFUNCTION(Hello, icinga, &ApiListener::HelloAPIHandler);
48
49 ApiListener::ApiListener(void)
50         : m_LogMessageCount(0)
51 { }
52
53 void ApiListener::OnConfigLoaded(void)
54 {
55         /* set up SSL context */
56         boost::shared_ptr<X509> cert;
57         try {
58                 cert = GetX509Certificate(GetCertPath());
59         } catch (const std::exception&) {
60                 BOOST_THROW_EXCEPTION(ScriptError("Cannot get certificate from cert path: '"
61                     + GetCertPath() + "'.", GetDebugInfo()));
62         }
63
64         try {
65                 SetIdentity(GetCertificateCN(cert));
66         } catch (const std::exception&) {
67                 BOOST_THROW_EXCEPTION(ScriptError("Cannot get certificate common name from cert path: '"
68                     + GetCertPath() + "'.", GetDebugInfo()));
69         }
70
71         Log(LogInformation, "ApiListener")
72             << "My API identity: " << GetIdentity();
73
74         try {
75                 m_SSLContext = MakeSSLContext(GetCertPath(), GetKeyPath(), GetCaPath());
76         } catch (const std::exception&) {
77                 BOOST_THROW_EXCEPTION(ScriptError("Cannot make SSL context for cert path: '"
78                     + GetCertPath() + "' key path: '" + GetKeyPath() + "' ca path: '" + GetCaPath() + "'.", GetDebugInfo()));
79         }
80
81         if (!GetCrlPath().IsEmpty()) {
82                 try {
83                         AddCRLToSSLContext(m_SSLContext, GetCrlPath());
84                 } catch (const std::exception&) {
85                         BOOST_THROW_EXCEPTION(ScriptError("Cannot add certificate revocation list to SSL context for crl path: '"
86                             + GetCrlPath() + "'.", GetDebugInfo()));
87                 }
88         }
89 }
90
91 void ApiListener::OnAllConfigLoaded(void)
92 {
93         if (!Endpoint::GetByName(GetIdentity()))
94                 BOOST_THROW_EXCEPTION(ScriptError("Endpoint object for '" + GetIdentity() + "' is missing.", GetDebugInfo()));
95 }
96
97 /**
98  * Starts the component.
99  */
100 void ApiListener::Start(void)
101 {
102         SyncZoneDirs();
103
104         if (std::distance(ConfigType::GetObjectsByType<ApiListener>().first,
105             ConfigType::GetObjectsByType<ApiListener>().second) > 1) {
106                 Log(LogCritical, "ApiListener", "Only one ApiListener object is allowed.");
107                 return;
108         }
109
110         ObjectImpl<ApiListener>::Start();
111
112         {
113                 boost::mutex::scoped_lock(m_LogLock);
114                 RotateLogFile();
115                 OpenLogFile();
116         }
117
118         /* create the primary JSON-RPC listener */
119         if (!AddListener(GetBindHost(), GetBindPort())) {
120                 Log(LogCritical, "ApiListener")
121                      << "Cannot add listener on host '" << GetBindHost() << "' for port '" << GetBindPort() << "'.";
122                 Application::Exit(EXIT_FAILURE);
123         }
124
125         m_Timer = new Timer();
126         m_Timer->OnTimerExpired.connect(boost::bind(&ApiListener::ApiTimerHandler, this));
127         m_Timer->SetInterval(5);
128         m_Timer->Start();
129         m_Timer->Reschedule(0);
130
131         OnMasterChanged(true);
132 }
133
134 ApiListener::Ptr ApiListener::GetInstance(void)
135 {
136         BOOST_FOREACH(const ApiListener::Ptr& listener, ConfigType::GetObjectsByType<ApiListener>())
137                 return listener;
138
139         return ApiListener::Ptr();
140 }
141
142 boost::shared_ptr<SSL_CTX> ApiListener::GetSSLContext(void) const
143 {
144         return m_SSLContext;
145 }
146
147 Endpoint::Ptr ApiListener::GetMaster(void) const
148 {
149         Zone::Ptr zone = Zone::GetLocalZone();
150
151         if (!zone)
152                 return Endpoint::Ptr();
153
154         std::vector<String> names;
155
156         BOOST_FOREACH(const Endpoint::Ptr& endpoint, zone->GetEndpoints())
157                 if (endpoint->IsConnected() || endpoint->GetName() == GetIdentity())
158                         names.push_back(endpoint->GetName());
159
160         std::sort(names.begin(), names.end());
161
162         return Endpoint::GetByName(*names.begin());
163 }
164
165 bool ApiListener::IsMaster(void) const
166 {
167         Endpoint::Ptr master = GetMaster();
168
169         if (!master)
170                 return false;
171
172         return master->GetName() == GetIdentity();
173 }
174
175 /**
176  * Creates a new JSON-RPC listener on the specified port.
177  *
178  * @param node The host the listener should be bound to.
179  * @param service The port to listen on.
180  */
181 bool ApiListener::AddListener(const String& node, const String& service)
182 {
183         ObjectLock olock(this);
184
185         boost::shared_ptr<SSL_CTX> sslContext = m_SSLContext;
186
187         if (!sslContext) {
188                 Log(LogCritical, "ApiListener", "SSL context is required for AddListener()");
189                 return false;
190         }
191
192         Log(LogInformation, "ApiListener")
193             << "Adding new listener on port '" << service << "'";
194
195         TcpSocket::Ptr server = new TcpSocket();
196
197         try {
198                 server->Bind(node, service, AF_UNSPEC);
199         } catch (const std::exception&) {
200                 Log(LogCritical, "ApiListener")
201                     << "Cannot bind TCP socket for host '" << node << "' on port '" << service << "'.";
202                 return false;
203         }
204
205         boost::thread thread(boost::bind(&ApiListener::ListenerThreadProc, this, server));
206         thread.detach();
207
208         m_Servers.insert(server);
209
210         return true;
211 }
212
213 void ApiListener::ListenerThreadProc(const Socket::Ptr& server)
214 {
215         Utility::SetThreadName("API Listener");
216
217         server->Listen();
218
219         for (;;) {
220                 try {
221                         Socket::Ptr client = server->Accept();
222                         boost::thread thread(boost::bind(&ApiListener::NewClientHandler, this, client, String(), RoleServer));
223                         thread.detach();
224                 } catch (const std::exception&) {
225                         Log(LogCritical, "ApiListener", "Cannot accept new connection.");
226                 }
227         }
228 }
229
230 /**
231  * Creates a new JSON-RPC client and connects to the specified endpoint.
232  *
233  * @param endpoint The endpoint.
234  */
235 void ApiListener::AddConnection(const Endpoint::Ptr& endpoint)
236 {
237         {
238                 ObjectLock olock(this);
239
240                 boost::shared_ptr<SSL_CTX> sslContext = m_SSLContext;
241
242                 if (!sslContext) {
243                         Log(LogCritical, "ApiListener", "SSL context is required for AddConnection()");
244                         return;
245                 }
246         }
247
248         String host = endpoint->GetHost();
249         String port = endpoint->GetPort();
250
251         Log(LogInformation, "JsonRpcConnection")
252             << "Reconnecting to API endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'";
253
254         TcpSocket::Ptr client = new TcpSocket();
255
256         try {
257                 endpoint->SetConnecting(true);
258                 client->Connect(host, port);
259                 NewClientHandler(client, endpoint->GetName(), RoleClient);
260                 endpoint->SetConnecting(false);
261         } catch (const std::exception& ex) {
262                 endpoint->SetConnecting(false);
263                 client->Close();
264
265                 std::ostringstream info;
266                 info << "Cannot connect to host '" << host << "' on port '" << port << "'";
267                 Log(LogCritical, "ApiListener", info.str());
268                 Log(LogDebug, "ApiListener")
269                     << info.str() << "\n" << DiagnosticInformation(ex);
270         }
271 }
272
273 void ApiListener::NewClientHandler(const Socket::Ptr& client, const String& hostname, ConnectionRole role)
274 {
275         try {
276                 NewClientHandlerInternal(client, hostname, role);
277         } catch (const std::exception& ex) {
278                 Log(LogCritical, "ApiListener")
279                     << "Exception while handling new API client connection: " << DiagnosticInformation(ex);
280         }
281 }
282
283 /**
284  * Processes a new client connection.
285  *
286  * @param client The new client.
287  */
288 void ApiListener::NewClientHandlerInternal(const Socket::Ptr& client, const String& hostname, ConnectionRole role)
289 {
290         CONTEXT("Handling new API client connection");
291
292         TlsStream::Ptr tlsStream;
293
294         {
295                 ObjectLock olock(this);
296                 try {
297                         tlsStream = new TlsStream(client, hostname, role, m_SSLContext);
298                 } catch (const std::exception&) {
299                         Log(LogCritical, "ApiListener", "Cannot create TLS stream from client connection.");
300                         return;
301                 }
302         }
303
304         try {
305                 tlsStream->Handshake();
306         } catch (const std::exception& ex) {
307                 Log(LogCritical, "ApiListener", "Client TLS handshake failed");
308                 return;
309         }
310
311         boost::shared_ptr<X509> cert = tlsStream->GetPeerCertificate();
312         String identity;
313         Endpoint::Ptr endpoint;
314         bool verify_ok = false;
315
316         if (cert) {
317                 try {
318                         identity = GetCertificateCN(cert);
319                 } catch (const std::exception&) {
320                         Log(LogCritical, "ApiListener")
321                             << "Cannot get certificate common name from cert path: '" << GetCertPath() << "'.";
322                         return;
323                 }
324
325                 verify_ok = tlsStream->IsVerifyOK();
326
327                 Log(LogInformation, "ApiListener")
328                     << "New client connection for identity '" << identity << "'" << (verify_ok ? "" : " (unauthenticated)");
329
330
331                 if (verify_ok)
332                         endpoint = Endpoint::GetByName(identity);
333         } else {
334                 Log(LogInformation, "ApiListener")
335                     << "New client connection (no client certificate)";
336         }
337
338         bool need_sync = false;
339
340         if (endpoint)
341                 need_sync = !endpoint->IsConnected();
342
343         ClientType ctype;
344
345         if (role == RoleClient) {
346                 Dictionary::Ptr message = new Dictionary();
347                 message->Set("jsonrpc", "2.0");
348                 message->Set("method", "icinga::Hello");
349                 message->Set("params", new Dictionary());
350                 JsonRpc::SendMessage(tlsStream, message);
351                 ctype = ClientJsonRpc;
352         } else {
353                 tlsStream->WaitForData(5);
354
355                 if (!tlsStream->IsDataAvailable()) {
356                         Log(LogWarning, "ApiListener", "No data received on new API connection.");
357                         return;
358                 }
359
360                 char firstByte;
361                 tlsStream->Peek(&firstByte, 1, false);
362
363                 if (firstByte >= '0' && firstByte <= '9')
364                         ctype = ClientJsonRpc;
365                 else
366                         ctype = ClientHttp;
367         }
368
369         if (ctype == ClientJsonRpc) {
370                 Log(LogInformation, "ApiListener", "New JSON-RPC client");
371
372                 JsonRpcConnection::Ptr aclient = new JsonRpcConnection(identity, verify_ok, tlsStream, role);
373                 aclient->Start();
374
375                 if (endpoint) {
376                         endpoint->AddClient(aclient);
377
378                         /* sync zone file config */
379                         SendConfigUpdate(aclient);
380                         /* sync runtime config */
381                         SendRuntimeConfigObjects(aclient);
382
383                         if (need_sync) {
384                                 {
385                                         ObjectLock olock(endpoint);
386
387                                         endpoint->SetSyncing(true);
388                                 }
389
390                                 ReplayLog(aclient);
391                         }
392                 } else
393                         AddAnonymousClient(aclient);
394         } else {
395                 Log(LogInformation, "ApiListener", "New HTTP client");
396
397                 HttpServerConnection::Ptr aclient = new HttpServerConnection(identity, verify_ok, tlsStream);
398                 aclient->Start();
399                 AddHttpClient(aclient);
400         }
401 }
402
403 void ApiListener::ApiTimerHandler(void)
404 {
405         double now = Utility::GetTime();
406
407         std::vector<int> files;
408         Utility::Glob(GetApiDir() + "log/*", boost::bind(&ApiListener::LogGlobHandler, boost::ref(files), _1), GlobFile);
409         std::sort(files.begin(), files.end());
410
411         BOOST_FOREACH(int ts, files) {
412                 bool need = false;
413
414                 BOOST_FOREACH(const Endpoint::Ptr& endpoint, ConfigType::GetObjectsByType<Endpoint>()) {
415                         if (endpoint->GetName() == GetIdentity())
416                                 continue;
417
418                         if (endpoint->GetLogDuration() >= 0 && ts < now - endpoint->GetLogDuration())
419                                 continue;
420
421                         if (ts > endpoint->GetLocalLogPosition()) {
422                                 need = true;
423                                 break;
424                         }
425                 }
426
427                 if (!need) {
428                         String path = GetApiDir() + "log/" + Convert::ToString(ts);
429                         Log(LogNotice, "ApiListener")
430                             << "Removing old log file: " << path;
431                         (void)unlink(path.CStr());
432                 }
433         }
434
435         Zone::Ptr my_zone = Zone::GetLocalZone();
436
437         BOOST_FOREACH(const Zone::Ptr& zone, ConfigType::GetObjectsByType<Zone>()) {
438                 /* only connect to endpoints in a) the same zone b) our parent zone c) immediate child zones */
439                 if (my_zone != zone && my_zone != zone->GetParent() && zone != my_zone->GetParent()) {
440                         Log(LogDebug, "ApiListener")
441                             << "Not connecting to Zone '" << zone->GetName()
442                             << "' because it's not in the same zone, a parent or a child zone.";
443                         continue;
444                 }
445
446                 BOOST_FOREACH(const Endpoint::Ptr& endpoint, zone->GetEndpoints()) {
447                         /* don't connect to ourselves */
448                         if (endpoint->GetName() == GetIdentity()) {
449                                 Log(LogDebug, "ApiListener")
450                                     << "Not connecting to Endpoint '" << endpoint->GetName() << "' because that's us.";
451                                 continue;
452                         }
453
454                         /* don't try to connect to endpoints which don't have a host and port */
455                         if (endpoint->GetHost().IsEmpty() || endpoint->GetPort().IsEmpty()) {
456                                 Log(LogDebug, "ApiListener")
457                                     << "Not connecting to Endpoint '" << endpoint->GetName()
458                                     << "' because the host/port attributes are missing.";
459                                 continue;
460                         }
461
462                         /* don't try to connect if there's already a connection attempt */
463                         if (endpoint->GetConnecting()) {
464                                 Log(LogDebug, "ApiListener")
465                                     << "Not connecting to Endpoint '" << endpoint->GetName()
466                                     << "' because we're already trying to connect to it.";
467                                 continue;
468                         }
469
470                         /* don't try to connect if we're already connected */
471                         if (endpoint->IsConnected()) {
472                                 Log(LogDebug, "ApiListener")
473                                     << "Not connecting to Endpoint '" << endpoint->GetName()
474                                     << "' because we're already connected to it.";
475                                 continue;
476                         }
477
478                         boost::thread thread(boost::bind(&ApiListener::AddConnection, this, endpoint));
479                         thread.detach();
480                 }
481         }
482
483         BOOST_FOREACH(const Endpoint::Ptr& endpoint, ConfigType::GetObjectsByType<Endpoint>()) {
484                 if (!endpoint->IsConnected())
485                         continue;
486
487                 double ts = endpoint->GetRemoteLogPosition();
488
489                 if (ts == 0)
490                         continue;
491
492                 Dictionary::Ptr lparams = new Dictionary();
493                 lparams->Set("log_position", ts);
494
495                 Dictionary::Ptr lmessage = new Dictionary();
496                 lmessage->Set("jsonrpc", "2.0");
497                 lmessage->Set("method", "log::SetLogPosition");
498                 lmessage->Set("params", lparams);
499
500                 BOOST_FOREACH(const JsonRpcConnection::Ptr& client, endpoint->GetClients())
501                         client->SendMessage(lmessage);
502
503                 Log(LogNotice, "ApiListener")
504                     << "Setting log position for identity '" << endpoint->GetName() << "': "
505                     << Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", ts);
506         }
507
508         Endpoint::Ptr master = GetMaster();
509
510         if (master)
511                 Log(LogNotice, "ApiListener")
512                     << "Current zone master: " << master->GetName();
513
514         std::vector<String> names;
515         BOOST_FOREACH(const Endpoint::Ptr& endpoint, ConfigType::GetObjectsByType<Endpoint>())
516                 if (endpoint->IsConnected())
517                         names.push_back(endpoint->GetName() + " (" + Convert::ToString(endpoint->GetClients().size()) + ")");
518
519         Log(LogNotice, "ApiListener")
520             << "Connected endpoints: " << Utility::NaturalJoin(names);
521 }
522
523 void ApiListener::RelayMessage(const MessageOrigin::Ptr& origin,
524     const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log)
525 {
526         m_RelayQueue.Enqueue(boost::bind(&ApiListener::SyncRelayMessage, this, origin, secobj, message, log), true);
527 }
528
529 void ApiListener::PersistMessage(const Dictionary::Ptr& message, const ConfigObject::Ptr& secobj)
530 {
531         double ts = message->Get("ts");
532
533         ASSERT(ts != 0);
534
535         Dictionary::Ptr pmessage = new Dictionary();
536         pmessage->Set("timestamp", ts);
537
538         pmessage->Set("message", JsonEncode(message));
539         Dictionary::Ptr secname = new Dictionary();
540         secname->Set("type", secobj->GetType()->GetName());
541         secname->Set("name", secobj->GetName());
542         pmessage->Set("secobj", secname);
543
544         boost::mutex::scoped_lock lock(m_LogLock);
545         if (m_LogFile) {
546                 NetString::WriteStringToStream(m_LogFile, JsonEncode(pmessage));
547                 m_LogMessageCount++;
548                 SetLogMessageTimestamp(ts);
549
550                 if (m_LogMessageCount > 50000) {
551                         CloseLogFile();
552                         RotateLogFile();
553                         OpenLogFile();
554                 }
555         }
556 }
557
558 void ApiListener::SyncSendMessage(const Endpoint::Ptr& endpoint, const Dictionary::Ptr& message)
559 {
560         ObjectLock olock(endpoint);
561
562         if (!endpoint->GetSyncing()) {
563                 Log(LogNotice, "ApiListener")
564                     << "Sending message to '" << endpoint->GetName() << "'";
565
566                 BOOST_FOREACH(const JsonRpcConnection::Ptr& client, endpoint->GetClients())
567                         client->SendMessage(message);
568         }
569 }
570
571
572 void ApiListener::SyncRelayMessage(const MessageOrigin::Ptr& origin,
573     const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log)
574 {
575         double ts = Utility::GetTime();
576         message->Set("ts", ts);
577
578         Log(LogNotice, "ApiListener")
579             << "Relaying '" << message->Get("method") << "' message";
580
581         if (log)
582                 PersistMessage(message, secobj);
583
584         if (origin && origin->FromZone)
585                 message->Set("originZone", origin->FromZone->GetName());
586
587         bool is_master = IsMaster();
588         Endpoint::Ptr master = GetMaster();
589         Zone::Ptr my_zone = Zone::GetLocalZone();
590
591         std::vector<Endpoint::Ptr> skippedEndpoints;
592         std::set<Zone::Ptr> finishedZones;
593
594         BOOST_FOREACH(const Endpoint::Ptr& endpoint, ConfigType::GetObjectsByType<Endpoint>()) {
595                 /* don't relay messages to ourselves or disconnected endpoints */
596                 if (endpoint->GetName() == GetIdentity() || !endpoint->IsConnected())
597                         continue;
598
599                 Zone::Ptr target_zone = endpoint->GetZone();
600
601                 /* don't relay the message to the zone through more than one endpoint */
602                 if (finishedZones.find(target_zone) != finishedZones.end()) {
603                         skippedEndpoints.push_back(endpoint);
604                         continue;
605                 }
606
607                 /* don't relay messages back to the endpoint which we got the message from */
608                 if (origin && origin->FromClient && endpoint == origin->FromClient->GetEndpoint()) {
609                         skippedEndpoints.push_back(endpoint);
610                         continue;
611                 }
612
613                 /* don't relay messages back to the zone which we got the message from */
614                 if (origin && origin->FromZone && target_zone == origin->FromZone) {
615                         skippedEndpoints.push_back(endpoint);
616                         continue;
617                 }
618
619                 /* only relay message to the master if we're not currently the master */
620                 if (!is_master && master != endpoint) {
621                         skippedEndpoints.push_back(endpoint);
622                         continue;
623                 }
624
625                 /* only relay the message to a) the same zone, b) the parent zone and c) direct child zones */
626                 if (target_zone != my_zone && target_zone != my_zone->GetParent() &&
627                     secobj->GetZoneName() != target_zone->GetName()) {
628                         skippedEndpoints.push_back(endpoint);
629                         continue;
630                 }
631
632                 /* only relay messages to zones which have access to the object */
633                 if (!target_zone->CanAccessObject(secobj))
634                         continue;
635
636                 finishedZones.insert(target_zone);
637
638                 SyncSendMessage(endpoint, message);
639         }
640
641         BOOST_FOREACH(const Endpoint::Ptr& endpoint, skippedEndpoints)
642                 endpoint->SetLocalLogPosition(ts);
643 }
644
645 String ApiListener::GetApiDir(void)
646 {
647         return Application::GetLocalStateDir() + "/lib/icinga2/api/";
648 }
649
650 /* must hold m_LogLock */
651 void ApiListener::OpenLogFile(void)
652 {
653         String path = GetApiDir() + "log/current";
654
655         std::fstream *fp = new std::fstream(path.CStr(), std::fstream::out | std::ofstream::app);
656
657         if (!fp->good()) {
658                 Log(LogWarning, "ApiListener")
659                     << "Could not open spool file: " << path;
660                 return;
661         }
662
663         m_LogFile = new StdioStream(fp, true);
664         m_LogMessageCount = 0;
665         SetLogMessageTimestamp(Utility::GetTime());
666 }
667
668 /* must hold m_LogLock */
669 void ApiListener::CloseLogFile(void)
670 {
671         if (!m_LogFile)
672                 return;
673
674         m_LogFile->Close();
675         m_LogFile.reset();
676 }
677
678 /* must hold m_LogLock */
679 void ApiListener::RotateLogFile(void)
680 {
681         double ts = GetLogMessageTimestamp();
682
683         if (ts == 0)
684                 ts = Utility::GetTime();
685
686         String oldpath = GetApiDir() + "log/current";
687         String newpath = GetApiDir() + "log/" + Convert::ToString(static_cast<int>(ts)+1);
688         (void) rename(oldpath.CStr(), newpath.CStr());
689 }
690
691 void ApiListener::LogGlobHandler(std::vector<int>& files, const String& file)
692 {
693         String name = Utility::BaseName(file);
694
695         if (name == "current")
696                 return;
697
698         int ts;
699
700         try {
701                 ts = Convert::ToLong(name);
702         } catch (const std::exception&) {
703                 return;
704         }
705
706         files.push_back(ts);
707 }
708
709 void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)
710 {
711         Endpoint::Ptr endpoint = client->GetEndpoint();
712
713         CONTEXT("Replaying log for Endpoint '" + endpoint->GetName() + "'");
714
715         int count = -1;
716         double peer_ts = endpoint->GetLocalLogPosition();
717         double logpos_ts = peer_ts;
718         bool last_sync = false;
719
720         Endpoint::Ptr target_endpoint = client->GetEndpoint();
721         ASSERT(target_endpoint);
722
723         Zone::Ptr target_zone = target_endpoint->GetZone();
724
725         if (!target_zone)
726                 return;
727
728         for (;;) {
729                 boost::mutex::scoped_lock lock(m_LogLock);
730
731                 CloseLogFile();
732                 RotateLogFile();
733
734                 if (count == -1 || count > 50000) {
735                         OpenLogFile();
736                         lock.unlock();
737                 } else {
738                         last_sync = true;
739                 }
740
741                 count = 0;
742
743                 std::vector<int> files;
744                 Utility::Glob(GetApiDir() + "log/*", boost::bind(&ApiListener::LogGlobHandler, boost::ref(files), _1), GlobFile);
745                 std::sort(files.begin(), files.end());
746
747                 BOOST_FOREACH(int ts, files) {
748                         String path = GetApiDir() + "log/" + Convert::ToString(ts);
749
750                         if (ts < peer_ts)
751                                 continue;
752
753                         Log(LogNotice, "ApiListener")
754                             << "Replaying log: " << path;
755
756                         std::fstream *fp = new std::fstream(path.CStr(), std::fstream::in | std::fstream::binary);
757                         StdioStream::Ptr logStream = new StdioStream(fp, true);
758
759                         String message;
760                         StreamReadContext src;
761                         while (true) {
762                                 Dictionary::Ptr pmessage;
763
764                                 try {
765                                         StreamReadStatus srs = NetString::ReadStringFromStream(logStream, &message, src);
766
767                                         if (srs == StatusEof)
768                                                 break;
769
770                                         if (srs != StatusNewItem)
771                                                 continue;
772
773                                         pmessage = JsonDecode(message);
774                                 } catch (const std::exception&) {
775                                         Log(LogWarning, "ApiListener")
776                                             << "Unexpected end-of-file for cluster log: " << path;
777
778                                         /* Log files may be incomplete or corrupted. This is perfectly OK. */
779                                         break;
780                                 }
781
782                                 if (pmessage->Get("timestamp") <= peer_ts)
783                                         continue;
784
785                                 Dictionary::Ptr secname = pmessage->Get("secobj");
786
787                                 if (secname) {
788                                         ConfigType::Ptr dtype = ConfigType::GetByName(secname->Get("type"));
789
790                                         if (!dtype)
791                                                 continue;
792
793                                         ConfigObject::Ptr secobj = dtype->GetObject(secname->Get("name"));
794
795                                         if (!secobj)
796                                                 continue;
797
798                                         if (!target_zone->CanAccessObject(secobj))
799                                                 continue;
800                                 }
801
802                                 NetString::WriteStringToStream(client->GetStream(), pmessage->Get("message"));
803                                 count++;
804
805                                 peer_ts = pmessage->Get("timestamp");
806
807                                 if (ts > logpos_ts + 10) {
808                                         logpos_ts = ts;
809
810                                         Dictionary::Ptr lparams = new Dictionary();
811                                         lparams->Set("log_position", logpos_ts);
812
813                                         Dictionary::Ptr lmessage = new Dictionary();
814                                         lmessage->Set("jsonrpc", "2.0");
815                                         lmessage->Set("method", "log::SetLogPosition");
816                                         lmessage->Set("params", lparams);
817
818                                         JsonRpc::SendMessage(client->GetStream(), lmessage);
819                                 }
820                         }
821
822                         logStream->Close();
823                 }
824
825                 if (count > 0) {
826                         Log(LogInformation, "ApiListener")
827                            << "Replayed " << count << " messages.";
828                 }
829
830                 Log(LogNotice, "ApiListener")
831                    << "Replayed " << count << " messages.";
832
833                 if (last_sync) {
834                         {
835                                 ObjectLock olock2(endpoint);
836                                 endpoint->SetSyncing(false);
837                         }
838
839                         OpenLogFile();
840
841                         break;
842                 }
843         }
844 }
845
846 void ApiListener::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
847 {
848         std::pair<Dictionary::Ptr, Dictionary::Ptr> stats;
849
850         ApiListener::Ptr listener = ApiListener::GetInstance();
851
852         if (!listener)
853                 return;
854
855         stats = listener->GetStatus();
856
857         ObjectLock olock(stats.second);
858         BOOST_FOREACH(const Dictionary::Pair& kv, stats.second)
859                 perfdata->Add("'api_" + kv.first + "'=" + Convert::ToString(kv.second));
860
861         status->Set("api", stats.first);
862 }
863
864 std::pair<Dictionary::Ptr, Dictionary::Ptr> ApiListener::GetStatus(void)
865 {
866         Dictionary::Ptr status = new Dictionary();
867         Dictionary::Ptr perfdata = new Dictionary();
868
869         /* cluster stats */
870         status->Set("identity", GetIdentity());
871
872         double allEndpoints = 0;
873         Array::Ptr allNotConnectedEndpoints = new Array();
874         Array::Ptr allConnectedEndpoints = new Array();
875
876         Zone::Ptr my_zone = Zone::GetLocalZone();
877
878         Dictionary::Ptr connectedZones = new Dictionary();
879
880         BOOST_FOREACH(const Zone::Ptr& zone, ConfigType::GetObjectsByType<Zone>()) {
881                 /* only check endpoints in a) the same zone b) our parent zone c) immediate child zones */
882                 if (my_zone != zone && my_zone != zone->GetParent() && zone != my_zone->GetParent()) {
883                         Log(LogDebug, "ApiListener")
884                             << "Not checking connection to Zone '" << zone->GetName() << "' because it's not in the same zone, a parent or a child zone.";
885                         continue;
886                 }
887
888                 bool zoneConnected = false;
889                 int countZoneEndpoints = 0;
890                 double zoneLag = 0;
891
892                 Array::Ptr zoneEndpoints = new Array();
893
894                 BOOST_FOREACH(const Endpoint::Ptr& endpoint, zone->GetEndpoints()) {
895                         zoneEndpoints->Add(endpoint->GetName());
896
897                         if (endpoint->GetName() == GetIdentity())
898                                 continue;
899
900                         double eplag = CalculateZoneLag(endpoint);
901
902                         if (eplag > 0 && eplag > zoneLag)
903                                 zoneLag = eplag;
904
905                         allEndpoints++;
906                         countZoneEndpoints++;
907
908                         if (!endpoint->IsConnected()) {
909                                 allNotConnectedEndpoints->Add(endpoint->GetName());
910                         } else {
911                                 allConnectedEndpoints->Add(endpoint->GetName());
912                                 zoneConnected = true;
913                         }
914                 }
915
916                 /* if there's only one endpoint inside the zone, we're not connected - that's us, fake it */
917                 if (zone->GetEndpoints().size() == 1 && countZoneEndpoints == 0)
918                         zoneConnected = true;
919
920                 Dictionary::Ptr zoneStats = new Dictionary();
921                 zoneStats->Set("connected", zoneConnected);
922                 zoneStats->Set("client_log_lag", zoneLag);
923                 zoneStats->Set("endpoints", zoneEndpoints);
924
925                 String parentZoneName;
926                 Zone::Ptr parentZone = zone->GetParent();
927                 if (parentZone)
928                         parentZoneName = parentZone->GetName();
929
930                 zoneStats->Set("parent_zone", parentZoneName);
931
932                 connectedZones->Set(zone->GetName(), zoneStats);
933         }
934
935         status->Set("num_endpoints", allEndpoints);
936         status->Set("num_conn_endpoints", allConnectedEndpoints->GetLength());
937         status->Set("num_not_conn_endpoints", allNotConnectedEndpoints->GetLength());
938         status->Set("conn_endpoints", allConnectedEndpoints);
939         status->Set("not_conn_endpoints", allNotConnectedEndpoints);
940
941         status->Set("zones", connectedZones);
942
943         perfdata->Set("num_endpoints", allEndpoints);
944         perfdata->Set("num_conn_endpoints", Convert::ToDouble(allConnectedEndpoints->GetLength()));
945         perfdata->Set("num_not_conn_endpoints", Convert::ToDouble(allNotConnectedEndpoints->GetLength()));
946
947         return std::make_pair(status, perfdata);
948 }
949
950 double ApiListener::CalculateZoneLag(const Endpoint::Ptr& endpoint)
951 {
952         double remoteLogPosition = endpoint->GetRemoteLogPosition();
953         double eplag = Utility::GetTime() - remoteLogPosition;
954
955         if ((endpoint->GetSyncing() || !endpoint->IsConnected()) && remoteLogPosition != 0)
956                 return eplag;
957
958         return 0;
959 }
960
961 void ApiListener::AddAnonymousClient(const JsonRpcConnection::Ptr& aclient)
962 {
963         ObjectLock olock(this);
964         m_AnonymousClients.insert(aclient);
965 }
966
967 void ApiListener::RemoveAnonymousClient(const JsonRpcConnection::Ptr& aclient)
968 {
969         ObjectLock olock(this);
970         m_AnonymousClients.erase(aclient);
971 }
972
973 std::set<JsonRpcConnection::Ptr> ApiListener::GetAnonymousClients(void) const
974 {
975         ObjectLock olock(this);
976         return m_AnonymousClients;
977 }
978
979 void ApiListener::AddHttpClient(const HttpServerConnection::Ptr& aclient)
980 {
981         ObjectLock olock(this);
982         m_HttpClients.insert(aclient);
983 }
984
985 void ApiListener::RemoveHttpClient(const HttpServerConnection::Ptr& aclient)
986 {
987         ObjectLock olock(this);
988         m_HttpClients.erase(aclient);
989 }
990
991 std::set<HttpServerConnection::Ptr> ApiListener::GetHttpClients(void) const
992 {
993         ObjectLock olock(this);
994         return m_HttpClients;
995 }
996
997 Value ApiListener::HelloAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params)
998 {
999         return Empty;
1000 }