]> granicus.if.org Git - icinga2/blob - lib/remote/apilistener.cpp
Implement initial api object sync for newly connected endpoints
[icinga2] / lib / remote / apilistener.cpp
1 /******************************************************************************
2  * Icinga 2                                                                   *
3  * Copyright (C) 2012-2015 Icinga Development Team (http://www.icinga.org)    *
4  *                                                                            *
5  * This program is free software; you can redistribute it and/or              *
6  * modify it under the terms of the GNU General Public License                *
7  * as published by the Free Software Foundation; either version 2             *
8  * of the License, or (at your option) any later version.                     *
9  *                                                                            *
10  * This program is distributed in the hope that it will be useful,            *
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
13  * GNU General Public License for more details.                               *
14  *                                                                            *
15  * You should have received a copy of the GNU General Public License          *
16  * along with this program; if not, write to the Free Software Foundation     *
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
18  ******************************************************************************/
19
20 #include "remote/apilistener.hpp"
21 #include "remote/apilistener.tcpp"
22 #include "remote/jsonrpcconnection.hpp"
23 #include "remote/endpoint.hpp"
24 #include "remote/jsonrpc.hpp"
25 #include "remote/apifunction.hpp"
26 #include "base/convert.hpp"
27 #include "base/netstring.hpp"
28 #include "base/json.hpp"
29 #include "base/configtype.hpp"
30 #include "base/logger.hpp"
31 #include "base/objectlock.hpp"
32 #include "base/stdiostream.hpp"
33 #include "base/application.hpp"
34 #include "base/context.hpp"
35 #include "base/statsfunction.hpp"
36 #include "base/exception.hpp"
37 #include <fstream>
38
39 using namespace icinga;
40
41 REGISTER_TYPE(ApiListener);
42
43 boost::signals2::signal<void(bool)> ApiListener::OnMasterChanged;
44
45 REGISTER_STATSFUNCTION(ApiListenerStats, &ApiListener::StatsFunc);
46
47 REGISTER_APIFUNCTION(Hello, icinga, &ApiListener::HelloAPIHandler);
48
49 ApiListener::ApiListener(void)
50         : m_LogMessageCount(0)
51 { }
52
53 void ApiListener::OnConfigLoaded(void)
54 {
55         /* set up SSL context */
56         boost::shared_ptr<X509> cert;
57         try {
58                 cert = GetX509Certificate(GetCertPath());
59         } catch (const std::exception&) {
60                 BOOST_THROW_EXCEPTION(ScriptError("Cannot get certificate from cert path: '" + GetCertPath() + "'.", GetDebugInfo()));
61         }
62
63         try {
64                 SetIdentity(GetCertificateCN(cert));
65         } catch (const std::exception&) {
66                 BOOST_THROW_EXCEPTION(ScriptError("Cannot get certificate common name from cert path: '" + GetCertPath() + "'.", GetDebugInfo()));
67         }
68
69         Log(LogInformation, "ApiListener")
70             << "My API identity: " << GetIdentity();
71
72         try {
73                 m_SSLContext = MakeSSLContext(GetCertPath(), GetKeyPath(), GetCaPath());
74         } catch (const std::exception&) {
75                 BOOST_THROW_EXCEPTION(ScriptError("Cannot make SSL context for cert path: '" + GetCertPath() + "' key path: '" + GetKeyPath() + "' ca path: '" + GetCaPath() + "'.", GetDebugInfo()));
76         }
77
78         if (!GetCrlPath().IsEmpty()) {
79                 try {
80                         AddCRLToSSLContext(m_SSLContext, GetCrlPath());
81                 } catch (const std::exception&) {
82                         BOOST_THROW_EXCEPTION(ScriptError("Cannot add certificate revocation list to SSL context for crl path: '" + GetCrlPath() + "'.", GetDebugInfo()));
83                 }
84         }
85 }
86
87 void ApiListener::OnAllConfigLoaded(void)
88 {
89         if (!Endpoint::GetByName(GetIdentity()))
90                 BOOST_THROW_EXCEPTION(ScriptError("Endpoint object for '" + GetIdentity() + "' is missing.", GetDebugInfo()));
91 }
92
93 /**
94  * Starts the component.
95  */
96 void ApiListener::Start(void)
97 {
98         SyncZoneDirs();
99
100         if (std::distance(ConfigType::GetObjectsByType<ApiListener>().first, ConfigType::GetObjectsByType<ApiListener>().second) > 1) {
101                 Log(LogCritical, "ApiListener", "Only one ApiListener object is allowed.");
102                 return;
103         }
104
105         ObjectImpl<ApiListener>::Start();
106
107         {
108                 boost::mutex::scoped_lock(m_LogLock);
109                 RotateLogFile();
110                 OpenLogFile();
111         }
112
113         /* create the primary JSON-RPC listener */
114         if (!AddListener(GetBindHost(), GetBindPort())) {
115                 Log(LogCritical, "ApiListener")
116                      << "Cannot add listener on host '" << GetBindHost() << "' for port '" << GetBindPort() << "'.";
117                 Application::Exit(EXIT_FAILURE);
118         }
119
120         m_Timer = new Timer();
121         m_Timer->OnTimerExpired.connect(boost::bind(&ApiListener::ApiTimerHandler, this));
122         m_Timer->SetInterval(5);
123         m_Timer->Start();
124         m_Timer->Reschedule(0);
125
126         OnMasterChanged(true);
127 }
128
129 ApiListener::Ptr ApiListener::GetInstance(void)
130 {
131         BOOST_FOREACH(const ApiListener::Ptr& listener, ConfigType::GetObjectsByType<ApiListener>())
132                 return listener;
133
134         return ApiListener::Ptr();
135 }
136
137 boost::shared_ptr<SSL_CTX> ApiListener::GetSSLContext(void) const
138 {
139         return m_SSLContext;
140 }
141
142 Endpoint::Ptr ApiListener::GetMaster(void) const
143 {
144         Zone::Ptr zone = Zone::GetLocalZone();
145
146         if (!zone)
147                 return Endpoint::Ptr();
148
149         std::vector<String> names;
150
151         BOOST_FOREACH(const Endpoint::Ptr& endpoint, zone->GetEndpoints())
152                 if (endpoint->IsConnected() || endpoint->GetName() == GetIdentity())
153                         names.push_back(endpoint->GetName());
154
155         std::sort(names.begin(), names.end());
156
157         return Endpoint::GetByName(*names.begin());
158 }
159
160 bool ApiListener::IsMaster(void) const
161 {
162         Endpoint::Ptr master = GetMaster();
163
164         if (!master)
165                 return false;
166
167         return master->GetName() == GetIdentity();
168 }
169
170 /**
171  * Creates a new JSON-RPC listener on the specified port.
172  *
173  * @param node The host the listener should be bound to.
174  * @param service The port to listen on.
175  */
176 bool ApiListener::AddListener(const String& node, const String& service)
177 {
178         ObjectLock olock(this);
179
180         boost::shared_ptr<SSL_CTX> sslContext = m_SSLContext;
181
182         if (!sslContext) {
183                 Log(LogCritical, "ApiListener", "SSL context is required for AddListener()");
184                 return false;
185         }
186
187         Log(LogInformation, "ApiListener")
188             << "Adding new listener on port '" << service << "'";
189
190         TcpSocket::Ptr server = new TcpSocket();
191
192         try {
193                 server->Bind(node, service, AF_UNSPEC);
194         } catch (const std::exception&) {
195                 Log(LogCritical, "ApiListener")
196                     << "Cannot bind TCP socket for host '" << node << "' on port '" << service << "'.";
197                 return false;
198         }
199
200         boost::thread thread(boost::bind(&ApiListener::ListenerThreadProc, this, server));
201         thread.detach();
202
203         m_Servers.insert(server);
204
205         return true;
206 }
207
208 void ApiListener::ListenerThreadProc(const Socket::Ptr& server)
209 {
210         Utility::SetThreadName("API Listener");
211
212         server->Listen();
213
214         for (;;) {
215                 try {
216                         Socket::Ptr client = server->Accept();
217                         boost::thread thread(boost::bind(&ApiListener::NewClientHandler, this, client, String(), RoleServer));
218                         thread.detach();
219                 } catch (const std::exception&) {
220                         Log(LogCritical, "ApiListener", "Cannot accept new connection.");
221                 }
222         }
223 }
224
225 /**
226  * Creates a new JSON-RPC client and connects to the specified endpoint.
227  *
228  * @param endpoint The endpoint.
229  */
230 void ApiListener::AddConnection(const Endpoint::Ptr& endpoint)
231 {
232         {
233                 ObjectLock olock(this);
234
235                 boost::shared_ptr<SSL_CTX> sslContext = m_SSLContext;
236
237                 if (!sslContext) {
238                         Log(LogCritical, "ApiListener", "SSL context is required for AddConnection()");
239                         return;
240                 }
241         }
242
243         String host = endpoint->GetHost();
244         String port = endpoint->GetPort();
245
246         Log(LogInformation, "JsonRpcConnection")
247             << "Reconnecting to API endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'";
248
249         TcpSocket::Ptr client = new TcpSocket();
250
251         try {
252                 endpoint->SetConnecting(true);
253                 client->Connect(host, port);
254                 NewClientHandler(client, endpoint->GetName(), RoleClient);
255                 endpoint->SetConnecting(false);
256         } catch (const std::exception& ex) {
257                 endpoint->SetConnecting(false);
258                 client->Close();
259
260                 std::ostringstream info;
261                 info << "Cannot connect to host '" << host << "' on port '" << port << "'";
262                 Log(LogCritical, "ApiListener", info.str());
263                 Log(LogDebug, "ApiListener")
264                     << info.str() << "\n" << DiagnosticInformation(ex);
265         }
266 }
267
268 void ApiListener::NewClientHandler(const Socket::Ptr& client, const String& hostname, ConnectionRole role)
269 {
270         try {
271                 NewClientHandlerInternal(client, hostname, role);
272         } catch (const std::exception& ex) {
273                 Log(LogCritical, "ApiListener")
274                     << "Exception while handling new API client connection: " << DiagnosticInformation(ex);
275         }
276 }
277
278 /**
279  * Processes a new client connection.
280  *
281  * @param client The new client.
282  */
283 void ApiListener::NewClientHandlerInternal(const Socket::Ptr& client, const String& hostname, ConnectionRole role)
284 {
285         CONTEXT("Handling new API client connection");
286
287         TlsStream::Ptr tlsStream;
288
289         {
290                 ObjectLock olock(this);
291                 try {
292                         tlsStream = new TlsStream(client, hostname, role, m_SSLContext);
293                 } catch (const std::exception&) {
294                         Log(LogCritical, "ApiListener", "Cannot create TLS stream from client connection.");
295                         return;
296                 }
297         }
298
299         try {
300                 tlsStream->Handshake();
301         } catch (const std::exception& ex) {
302                 Log(LogCritical, "ApiListener", "Client TLS handshake failed");
303                 return;
304         }
305
306         boost::shared_ptr<X509> cert = tlsStream->GetPeerCertificate();
307         String identity;
308         Endpoint::Ptr endpoint;
309         bool verify_ok = false;
310
311         if (cert) {
312                 try {
313                         identity = GetCertificateCN(cert);
314                 } catch (const std::exception&) {
315                         Log(LogCritical, "ApiListener")
316                             << "Cannot get certificate common name from cert path: '" << GetCertPath() << "'.";
317                         return;
318                 }
319
320                 verify_ok = tlsStream->IsVerifyOK();
321
322                 Log(LogInformation, "ApiListener")
323                     << "New client connection for identity '" << identity << "'" << (verify_ok ? "" : " (unauthenticated)");
324
325
326                 if (verify_ok)
327                         endpoint = Endpoint::GetByName(identity);
328         } else {
329                 Log(LogInformation, "ApiListener")
330                     << "New client connection (no client certificate)";
331         }
332
333         bool need_sync = false;
334
335         if (endpoint)
336                 need_sync = !endpoint->IsConnected();
337
338         ClientType ctype;
339
340         if (role == RoleClient) {
341                 Dictionary::Ptr message = new Dictionary();
342                 message->Set("jsonrpc", "2.0");
343                 message->Set("method", "icinga::Hello");
344                 message->Set("params", new Dictionary());
345                 JsonRpc::SendMessage(tlsStream, message);
346                 ctype = ClientJsonRpc;
347         } else {
348                 tlsStream->WaitForData(5);
349
350                 if (!tlsStream->IsDataAvailable()) {
351                         Log(LogWarning, "ApiListener", "No data received on new API connection.");
352                         return;
353                 }
354
355                 char firstByte;
356                 tlsStream->Peek(&firstByte, 1, false);
357
358                 if (firstByte >= '0' && firstByte <= '9')
359                         ctype = ClientJsonRpc;
360                 else
361                         ctype = ClientHttp;
362         }
363
364         if (ctype == ClientJsonRpc) {
365                 Log(LogInformation, "ApiListener", "New JSON-RPC client");
366
367                 JsonRpcConnection::Ptr aclient = new JsonRpcConnection(identity, verify_ok, tlsStream, role);
368                 aclient->Start();
369
370                 if (endpoint) {
371                         endpoint->AddClient(aclient);
372
373                         if (need_sync) {
374                                 {
375                                         ObjectLock olock(endpoint);
376
377                                         endpoint->SetSyncing(true);
378                                 }
379
380                                 ReplayLog(aclient);
381                         }
382
383                         /* sync zone file config */
384                         SendConfigUpdate(aclient);
385                         /* sync runtime config */
386                         SendRuntimeConfigObjects(aclient);
387                 } else
388                         AddAnonymousClient(aclient);
389         } else {
390                 Log(LogInformation, "ApiListener", "New HTTP client");
391
392                 HttpServerConnection::Ptr aclient = new HttpServerConnection(identity, verify_ok, tlsStream);
393                 aclient->Start();
394                 AddHttpClient(aclient);
395         }
396 }
397
398 void ApiListener::ApiTimerHandler(void)
399 {
400         double now = Utility::GetTime();
401
402         std::vector<int> files;
403         Utility::Glob(GetApiDir() + "log/*", boost::bind(&ApiListener::LogGlobHandler, boost::ref(files), _1), GlobFile);
404         std::sort(files.begin(), files.end());
405
406         BOOST_FOREACH(int ts, files) {
407                 bool need = false;
408
409                 BOOST_FOREACH(const Endpoint::Ptr& endpoint, ConfigType::GetObjectsByType<Endpoint>()) {
410                         if (endpoint->GetName() == GetIdentity())
411                                 continue;
412
413                         if (endpoint->GetLogDuration() >= 0 && ts < now - endpoint->GetLogDuration())
414                                 continue;
415
416                         if (ts > endpoint->GetLocalLogPosition()) {
417                                 need = true;
418                                 break;
419                         }
420                 }
421
422                 if (!need) {
423                         String path = GetApiDir() + "log/" + Convert::ToString(ts);
424                         Log(LogNotice, "ApiListener")
425                             << "Removing old log file: " << path;
426                         (void)unlink(path.CStr());
427                 }
428         }
429
430         Zone::Ptr my_zone = Zone::GetLocalZone();
431
432         BOOST_FOREACH(const Zone::Ptr& zone, ConfigType::GetObjectsByType<Zone>()) {
433                 /* only connect to endpoints in a) the same zone b) our parent zone c) immediate child zones */
434                 if (my_zone != zone && my_zone != zone->GetParent() && zone != my_zone->GetParent()) {
435                         Log(LogDebug, "ApiListener")
436                             << "Not connecting to Zone '" << zone->GetName() << "' because it's not in the same zone, a parent or a child zone.";
437                         continue;
438                 }
439
440                 BOOST_FOREACH(const Endpoint::Ptr& endpoint, zone->GetEndpoints()) {
441                         /* don't connect to ourselves */
442                         if (endpoint->GetName() == GetIdentity()) {
443                                 Log(LogDebug, "ApiListener")
444                                     << "Not connecting to Endpoint '" << endpoint->GetName() << "' because that's us.";
445                                 continue;
446                         }
447
448                         /* don't try to connect to endpoints which don't have a host and port */
449                         if (endpoint->GetHost().IsEmpty() || endpoint->GetPort().IsEmpty()) {
450                                 Log(LogDebug, "ApiListener")
451                                     << "Not connecting to Endpoint '" << endpoint->GetName() << "' because the host/port attributes are missing.";
452                                 continue;
453                         }
454
455                         /* don't try to connect if there's already a connection attempt */
456                         if (endpoint->GetConnecting()) {
457                                 Log(LogDebug, "ApiListener")
458                                     << "Not connecting to Endpoint '" << endpoint->GetName() << "' because we're already trying to connect to it.";
459                                 continue;
460                         }
461
462                         /* don't try to connect if we're already connected */
463                         if (endpoint->IsConnected()) {
464                                 Log(LogDebug, "ApiListener")
465                                     << "Not connecting to Endpoint '" << endpoint->GetName() << "' because we're already connected to it.";
466                                 continue;
467                         }
468
469                         boost::thread thread(boost::bind(&ApiListener::AddConnection, this, endpoint));
470                         thread.detach();
471                 }
472         }
473
474         BOOST_FOREACH(const Endpoint::Ptr& endpoint, ConfigType::GetObjectsByType<Endpoint>()) {
475                 if (!endpoint->IsConnected())
476                         continue;
477
478                 double ts = endpoint->GetRemoteLogPosition();
479
480                 if (ts == 0)
481                         continue;
482
483                 Dictionary::Ptr lparams = new Dictionary();
484                 lparams->Set("log_position", ts);
485
486                 Dictionary::Ptr lmessage = new Dictionary();
487                 lmessage->Set("jsonrpc", "2.0");
488                 lmessage->Set("method", "log::SetLogPosition");
489                 lmessage->Set("params", lparams);
490
491                 BOOST_FOREACH(const JsonRpcConnection::Ptr& client, endpoint->GetClients())
492                         client->SendMessage(lmessage);
493
494                 Log(LogNotice, "ApiListener")
495                     << "Setting log position for identity '" << endpoint->GetName() << "': "
496                     << Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", ts);
497         }
498
499         Endpoint::Ptr master = GetMaster();
500
501         if (master)
502                 Log(LogNotice, "ApiListener")
503                     << "Current zone master: " << master->GetName();
504
505         std::vector<String> names;
506         BOOST_FOREACH(const Endpoint::Ptr& endpoint, ConfigType::GetObjectsByType<Endpoint>())
507                 if (endpoint->IsConnected())
508                         names.push_back(endpoint->GetName() + " (" + Convert::ToString(endpoint->GetClients().size()) + ")");
509
510         Log(LogNotice, "ApiListener")
511             << "Connected endpoints: " << Utility::NaturalJoin(names);
512 }
513
514 void ApiListener::RelayMessage(const MessageOrigin::Ptr& origin, const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log)
515 {
516         m_RelayQueue.Enqueue(boost::bind(&ApiListener::SyncRelayMessage, this, origin, secobj, message, log), true);
517 }
518
519 void ApiListener::PersistMessage(const Dictionary::Ptr& message, const ConfigObject::Ptr& secobj)
520 {
521         double ts = message->Get("ts");
522
523         ASSERT(ts != 0);
524
525         Dictionary::Ptr pmessage = new Dictionary();
526         pmessage->Set("timestamp", ts);
527
528         pmessage->Set("message", JsonEncode(message));
529         
530         Dictionary::Ptr secname = new Dictionary();
531         secname->Set("type", secobj->GetType()->GetName());
532         secname->Set("name", secobj->GetName());
533         pmessage->Set("secobj", secname);
534
535         boost::mutex::scoped_lock lock(m_LogLock);
536         if (m_LogFile) {
537                 NetString::WriteStringToStream(m_LogFile, JsonEncode(pmessage));
538                 m_LogMessageCount++;
539                 SetLogMessageTimestamp(ts);
540
541                 if (m_LogMessageCount > 50000) {
542                         CloseLogFile();
543                         RotateLogFile();
544                         OpenLogFile();
545                 }
546         }
547 }
548
549 void ApiListener::SyncSendMessage(const Endpoint::Ptr& endpoint, const Dictionary::Ptr& message)
550 {
551         ObjectLock olock(endpoint);
552
553         if (!endpoint->GetSyncing()) {
554                 Log(LogNotice, "ApiListener")
555                     << "Sending message to '" << endpoint->GetName() << "'";
556
557                 BOOST_FOREACH(const JsonRpcConnection::Ptr& client, endpoint->GetClients())
558                         client->SendMessage(message);
559         }
560 }
561
562
563 void ApiListener::SyncRelayMessage(const MessageOrigin::Ptr& origin, const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log)
564 {
565         double ts = Utility::GetTime();
566         message->Set("ts", ts);
567
568         Log(LogNotice, "ApiListener")
569             << "Relaying '" << message->Get("method") << "' message";
570
571         if (log)
572                 PersistMessage(message, secobj);
573
574         if (origin && origin->FromZone)
575                 message->Set("originZone", origin->FromZone->GetName());
576
577         bool is_master = IsMaster();
578         Endpoint::Ptr master = GetMaster();
579         Zone::Ptr my_zone = Zone::GetLocalZone();
580
581         std::vector<Endpoint::Ptr> skippedEndpoints;
582         std::set<Zone::Ptr> finishedZones;
583
584         BOOST_FOREACH(const Endpoint::Ptr& endpoint, ConfigType::GetObjectsByType<Endpoint>()) {
585                 /* don't relay messages to ourselves or disconnected endpoints */
586                 if (endpoint->GetName() == GetIdentity() || !endpoint->IsConnected())
587                         continue;
588
589                 Zone::Ptr target_zone = endpoint->GetZone();
590
591                 /* don't relay the message to the zone through more than one endpoint */
592                 if (finishedZones.find(target_zone) != finishedZones.end()) {
593                         skippedEndpoints.push_back(endpoint);
594                         continue;
595                 }
596
597                 /* don't relay messages back to the endpoint which we got the message from */
598                 if (origin && origin->FromClient && endpoint == origin->FromClient->GetEndpoint()) {
599                         skippedEndpoints.push_back(endpoint);
600                         continue;
601                 }
602
603                 /* don't relay messages back to the zone which we got the message from */
604                 if (origin && origin->FromZone && target_zone == origin->FromZone) {
605                         skippedEndpoints.push_back(endpoint);
606                         continue;
607                 }
608
609                 /* only relay message to the master if we're not currently the master */
610                 if (!is_master && master != endpoint) {
611                         skippedEndpoints.push_back(endpoint);
612                         continue;
613                 }
614
615                 /* only relay the message to a) the same zone, b) the parent zone and c) direct child zones */
616                 if (target_zone != my_zone && target_zone != my_zone->GetParent() &&
617                     secobj->GetZoneName() != target_zone->GetName()) {
618                         skippedEndpoints.push_back(endpoint);
619                         continue;
620                 }
621
622                 /* only relay messages to zones which have access to the object */
623                 if (!target_zone->CanAccessObject(secobj))
624                         continue;
625
626                 finishedZones.insert(target_zone);
627
628                 SyncSendMessage(endpoint, message);
629         }
630
631         BOOST_FOREACH(const Endpoint::Ptr& endpoint, skippedEndpoints)
632                 endpoint->SetLocalLogPosition(ts);
633 }
634
635 String ApiListener::GetApiDir(void)
636 {
637         return Application::GetLocalStateDir() + "/lib/icinga2/api/";
638 }
639
640 /* must hold m_LogLock */
641 void ApiListener::OpenLogFile(void)
642 {
643         String path = GetApiDir() + "log/current";
644
645         std::fstream *fp = new std::fstream(path.CStr(), std::fstream::out | std::ofstream::app);
646
647         if (!fp->good()) {
648                 Log(LogWarning, "ApiListener")
649                     << "Could not open spool file: " << path;
650                 return;
651         }
652
653         m_LogFile = new StdioStream(fp, true);
654         m_LogMessageCount = 0;
655         SetLogMessageTimestamp(Utility::GetTime());
656 }
657
658 /* must hold m_LogLock */
659 void ApiListener::CloseLogFile(void)
660 {
661         if (!m_LogFile)
662                 return;
663
664         m_LogFile->Close();
665         m_LogFile.reset();
666 }
667
668 /* must hold m_LogLock */
669 void ApiListener::RotateLogFile(void)
670 {
671         double ts = GetLogMessageTimestamp();
672
673         if (ts == 0)
674                 ts = Utility::GetTime();
675
676         String oldpath = GetApiDir() + "log/current";
677         String newpath = GetApiDir() + "log/" + Convert::ToString(static_cast<int>(ts)+1);
678         (void) rename(oldpath.CStr(), newpath.CStr());
679 }
680
681 void ApiListener::LogGlobHandler(std::vector<int>& files, const String& file)
682 {
683         String name = Utility::BaseName(file);
684
685         int ts;
686
687         try {
688                 ts = Convert::ToLong(name);
689         }
690         catch (const std::exception&) {
691                 return;
692         }
693
694         files.push_back(ts);
695 }
696
697 void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)
698 {
699         Endpoint::Ptr endpoint = client->GetEndpoint();
700
701         CONTEXT("Replaying log for Endpoint '" + endpoint->GetName() + "'");
702
703         int count = -1;
704         double peer_ts = endpoint->GetLocalLogPosition();
705         double logpos_ts = peer_ts;
706         bool last_sync = false;
707         
708         Endpoint::Ptr target_endpoint = client->GetEndpoint();
709         ASSERT(target_endpoint);
710         
711         Zone::Ptr target_zone = target_endpoint->GetZone();
712         
713         if (!target_zone)
714                 return;
715
716         for (;;) {
717                 boost::mutex::scoped_lock lock(m_LogLock);
718
719                 CloseLogFile();
720                 RotateLogFile();
721
722                 if (count == -1 || count > 50000) {
723                         OpenLogFile();
724                         lock.unlock();
725                 } else {
726                         last_sync = true;
727                 }
728
729                 count = 0;
730
731                 std::vector<int> files;
732                 Utility::Glob(GetApiDir() + "log/*", boost::bind(&ApiListener::LogGlobHandler, boost::ref(files), _1), GlobFile);
733                 std::sort(files.begin(), files.end());
734
735                 BOOST_FOREACH(int ts, files) {
736                         String path = GetApiDir() + "log/" + Convert::ToString(ts);
737
738                         if (ts < peer_ts)
739                                 continue;
740
741                         Log(LogNotice, "ApiListener")
742                             << "Replaying log: " << path;
743
744                         std::fstream *fp = new std::fstream(path.CStr(), std::fstream::in | std::fstream::binary);
745                         StdioStream::Ptr logStream = new StdioStream(fp, true);
746
747                         String message;
748                         StreamReadContext src;
749                         while (true) {
750                                 Dictionary::Ptr pmessage;
751
752                                 try {
753                                         StreamReadStatus srs = NetString::ReadStringFromStream(logStream, &message, src);
754
755                                         if (srs == StatusEof)
756                                                 break;
757
758                                         if (srs != StatusNewItem)
759                                                 continue;
760
761                                         pmessage = JsonDecode(message);
762                                 } catch (const std::exception&) {
763                                         Log(LogWarning, "ApiListener")
764                                             << "Unexpected end-of-file for cluster log: " << path;
765
766                                         /* Log files may be incomplete or corrupted. This is perfectly OK. */
767                                         break;
768                                 }
769
770                                 if (pmessage->Get("timestamp") <= peer_ts)
771                                         continue;
772
773                                 Dictionary::Ptr secname = pmessage->Get("secobj");
774                                 
775                                 if (secname) {
776                                         ConfigType::Ptr dtype = ConfigType::GetByName(secname->Get("type"));
777                                         
778                                         if (!dtype)
779                                                 continue;
780                                         
781                                         ConfigObject::Ptr secobj = dtype->GetObject(secname->Get("name"));
782                                         
783                                         if (!secobj)
784                                                 continue;
785                                         
786                                         if (!target_zone->CanAccessObject(secobj))
787                                                 continue;
788                                 }
789
790                                 NetString::WriteStringToStream(client->GetStream(), pmessage->Get("message"));
791                                 count++;
792
793                                 peer_ts = pmessage->Get("timestamp");
794
795                                 if (ts > logpos_ts + 10) {
796                                         logpos_ts = ts;
797
798                                         Dictionary::Ptr lparams = new Dictionary();
799                                         lparams->Set("log_position", logpos_ts);
800
801                                         Dictionary::Ptr lmessage = new Dictionary();
802                                         lmessage->Set("jsonrpc", "2.0");
803                                         lmessage->Set("method", "log::SetLogPosition");
804                                         lmessage->Set("params", lparams);
805
806                                         JsonRpc::SendMessage(client->GetStream(), lmessage);
807                                 }
808                         }
809
810                         logStream->Close();
811                 }
812
813                 if (count > 0) {
814                         Log(LogInformation, "ApiListener")
815                            << "Replayed " << count << " messages.";
816                 }
817
818                 Log(LogNotice, "ApiListener")
819                    << "Replayed " << count << " messages.";
820
821                 if (last_sync) {
822                         {
823                                 ObjectLock olock2(endpoint);
824                                 endpoint->SetSyncing(false);
825                         }
826
827                         OpenLogFile();
828
829                         break;
830                 }
831         }
832 }
833
834 void ApiListener::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
835 {
836         Dictionary::Ptr nodes = new Dictionary();
837         std::pair<Dictionary::Ptr, Dictionary::Ptr> stats;
838
839         ApiListener::Ptr listener = ApiListener::GetInstance();
840
841         if (!listener)
842                 return;
843
844         stats = listener->GetStatus();
845
846         ObjectLock olock(stats.second);
847         BOOST_FOREACH(const Dictionary::Pair& kv, stats.second)
848                 perfdata->Add("'api_" + kv.first + "'=" + Convert::ToString(kv.second));
849
850         status->Set("api", stats.first);
851 }
852
853 std::pair<Dictionary::Ptr, Dictionary::Ptr> ApiListener::GetStatus(void)
854 {
855         Dictionary::Ptr status = new Dictionary();
856         Dictionary::Ptr perfdata = new Dictionary();
857
858         /* cluster stats */
859         status->Set("identity", GetIdentity());
860
861         double count_endpoints = 0;
862         Array::Ptr not_connected_endpoints = new Array();
863         Array::Ptr connected_endpoints = new Array();
864
865         Zone::Ptr my_zone = Zone::GetLocalZone();
866
867         BOOST_FOREACH(const Zone::Ptr& zone, ConfigType::GetObjectsByType<Zone>()) {
868                 /* only check endpoints in a) the same zone b) our parent zone c) immediate child zones */
869                 if (my_zone != zone && my_zone != zone->GetParent() && zone != my_zone->GetParent()) {
870                         Log(LogDebug, "ApiListener")
871                             << "Not checking connection to Zone '" << zone->GetName() << "' because it's not in the same zone, a parent or a child zone.";
872                         continue;
873                 }
874
875                 BOOST_FOREACH(const Endpoint::Ptr& endpoint, zone->GetEndpoints()) {
876                         if (endpoint->GetName() == GetIdentity())
877                                 continue;
878
879                         count_endpoints++;
880
881                         if (!endpoint->IsConnected())
882                                 not_connected_endpoints->Add(endpoint->GetName());
883                         else
884                                 connected_endpoints->Add(endpoint->GetName());
885                 }
886         }
887
888         status->Set("num_endpoints", count_endpoints);
889         status->Set("num_conn_endpoints", connected_endpoints->GetLength());
890         status->Set("num_not_conn_endpoints", not_connected_endpoints->GetLength());
891         status->Set("conn_endpoints", connected_endpoints);
892         status->Set("not_conn_endpoints", not_connected_endpoints);
893
894         perfdata->Set("num_endpoints", count_endpoints);
895         perfdata->Set("num_conn_endpoints", Convert::ToDouble(connected_endpoints->GetLength()));
896         perfdata->Set("num_not_conn_endpoints", Convert::ToDouble(not_connected_endpoints->GetLength()));
897
898         return std::make_pair(status, perfdata);
899 }
900
901 void ApiListener::AddAnonymousClient(const JsonRpcConnection::Ptr& aclient)
902 {
903         ObjectLock olock(this);
904         m_AnonymousClients.insert(aclient);
905 }
906
907 void ApiListener::RemoveAnonymousClient(const JsonRpcConnection::Ptr& aclient)
908 {
909         ObjectLock olock(this);
910         m_AnonymousClients.erase(aclient);
911 }
912
913 std::set<JsonRpcConnection::Ptr> ApiListener::GetAnonymousClients(void) const
914 {
915         ObjectLock olock(this);
916         return m_AnonymousClients;
917 }
918
919 void ApiListener::AddHttpClient(const HttpServerConnection::Ptr& aclient)
920 {
921         ObjectLock olock(this);
922         m_HttpClients.insert(aclient);
923 }
924
925 void ApiListener::RemoveHttpClient(const HttpServerConnection::Ptr& aclient)
926 {
927         ObjectLock olock(this);
928         m_HttpClients.erase(aclient);
929 }
930
931 std::set<HttpServerConnection::Ptr> ApiListener::GetHttpClients(void) const
932 {
933         ObjectLock olock(this);
934         return m_HttpClients;
935 }
936
937 Value ApiListener::HelloAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params)
938 {
939         return Empty;
940 }