1 /******************************************************************************
3 * Copyright (C) 2012-2015 Icinga Development Team (http://www.icinga.org) *
5 * This program is free software; you can redistribute it and/or *
6 * modify it under the terms of the GNU General Public License *
7 * as published by the Free Software Foundation; either version 2 *
8 * of the License, or (at your option) any later version. *
10 * This program is distributed in the hope that it will be useful, *
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
13 * GNU General Public License for more details. *
15 * You should have received a copy of the GNU General Public License *
16 * along with this program; if not, write to the Free Software Foundation *
17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
18 ******************************************************************************/
20 #include "remote/apilistener.hpp"
21 #include "remote/apilistener.tcpp"
22 #include "remote/jsonrpcconnection.hpp"
23 #include "remote/endpoint.hpp"
24 #include "remote/jsonrpc.hpp"
25 #include "remote/apifunction.hpp"
26 #include "base/convert.hpp"
27 #include "base/netstring.hpp"
28 #include "base/json.hpp"
29 #include "base/configtype.hpp"
30 #include "base/logger.hpp"
31 #include "base/objectlock.hpp"
32 #include "base/stdiostream.hpp"
33 #include "base/application.hpp"
34 #include "base/context.hpp"
35 #include "base/statsfunction.hpp"
36 #include "base/exception.hpp"
39 using namespace icinga;
41 REGISTER_TYPE(ApiListener);
43 boost::signals2::signal<void(bool)> ApiListener::OnMasterChanged;
45 REGISTER_STATSFUNCTION(ApiListener, &ApiListener::StatsFunc);
47 REGISTER_APIFUNCTION(Hello, icinga, &ApiListener::HelloAPIHandler);
49 ApiListener::ApiListener(void)
50 : m_LogMessageCount(0)
53 void ApiListener::OnConfigLoaded(void)
55 /* set up SSL context */
56 boost::shared_ptr<X509> cert;
58 cert = GetX509Certificate(GetCertPath());
59 } catch (const std::exception&) {
60 BOOST_THROW_EXCEPTION(ScriptError("Cannot get certificate from cert path: '"
61 + GetCertPath() + "'.", GetDebugInfo()));
65 SetIdentity(GetCertificateCN(cert));
66 } catch (const std::exception&) {
67 BOOST_THROW_EXCEPTION(ScriptError("Cannot get certificate common name from cert path: '"
68 + GetCertPath() + "'.", GetDebugInfo()));
71 Log(LogInformation, "ApiListener")
72 << "My API identity: " << GetIdentity();
75 m_SSLContext = MakeSSLContext(GetCertPath(), GetKeyPath(), GetCaPath());
76 } catch (const std::exception&) {
77 BOOST_THROW_EXCEPTION(ScriptError("Cannot make SSL context for cert path: '"
78 + GetCertPath() + "' key path: '" + GetKeyPath() + "' ca path: '" + GetCaPath() + "'.", GetDebugInfo()));
81 if (!GetCrlPath().IsEmpty()) {
83 AddCRLToSSLContext(m_SSLContext, GetCrlPath());
84 } catch (const std::exception&) {
85 BOOST_THROW_EXCEPTION(ScriptError("Cannot add certificate revocation list to SSL context for crl path: '"
86 + GetCrlPath() + "'.", GetDebugInfo()));
91 void ApiListener::OnAllConfigLoaded(void)
93 if (!Endpoint::GetByName(GetIdentity()))
94 BOOST_THROW_EXCEPTION(ScriptError("Endpoint object for '" + GetIdentity() + "' is missing.", GetDebugInfo()));
98 * Starts the component.
100 void ApiListener::Start(void)
104 if (std::distance(ConfigType::GetObjectsByType<ApiListener>().first,
105 ConfigType::GetObjectsByType<ApiListener>().second) > 1) {
106 Log(LogCritical, "ApiListener", "Only one ApiListener object is allowed.");
110 ObjectImpl<ApiListener>::Start();
113 boost::mutex::scoped_lock(m_LogLock);
118 /* create the primary JSON-RPC listener */
119 if (!AddListener(GetBindHost(), GetBindPort())) {
120 Log(LogCritical, "ApiListener")
121 << "Cannot add listener on host '" << GetBindHost() << "' for port '" << GetBindPort() << "'.";
122 Application::Exit(EXIT_FAILURE);
125 m_Timer = new Timer();
126 m_Timer->OnTimerExpired.connect(boost::bind(&ApiListener::ApiTimerHandler, this));
127 m_Timer->SetInterval(5);
129 m_Timer->Reschedule(0);
131 OnMasterChanged(true);
134 ApiListener::Ptr ApiListener::GetInstance(void)
136 BOOST_FOREACH(const ApiListener::Ptr& listener, ConfigType::GetObjectsByType<ApiListener>())
139 return ApiListener::Ptr();
142 boost::shared_ptr<SSL_CTX> ApiListener::GetSSLContext(void) const
147 Endpoint::Ptr ApiListener::GetMaster(void) const
149 Zone::Ptr zone = Zone::GetLocalZone();
152 return Endpoint::Ptr();
154 std::vector<String> names;
156 BOOST_FOREACH(const Endpoint::Ptr& endpoint, zone->GetEndpoints())
157 if (endpoint->IsConnected() || endpoint->GetName() == GetIdentity())
158 names.push_back(endpoint->GetName());
160 std::sort(names.begin(), names.end());
162 return Endpoint::GetByName(*names.begin());
165 bool ApiListener::IsMaster(void) const
167 Endpoint::Ptr master = GetMaster();
172 return master->GetName() == GetIdentity();
176 * Creates a new JSON-RPC listener on the specified port.
178 * @param node The host the listener should be bound to.
179 * @param service The port to listen on.
181 bool ApiListener::AddListener(const String& node, const String& service)
183 ObjectLock olock(this);
185 boost::shared_ptr<SSL_CTX> sslContext = m_SSLContext;
188 Log(LogCritical, "ApiListener", "SSL context is required for AddListener()");
192 Log(LogInformation, "ApiListener")
193 << "Adding new listener on port '" << service << "'";
195 TcpSocket::Ptr server = new TcpSocket();
198 server->Bind(node, service, AF_UNSPEC);
199 } catch (const std::exception&) {
200 Log(LogCritical, "ApiListener")
201 << "Cannot bind TCP socket for host '" << node << "' on port '" << service << "'.";
205 boost::thread thread(boost::bind(&ApiListener::ListenerThreadProc, this, server));
208 m_Servers.insert(server);
213 void ApiListener::ListenerThreadProc(const Socket::Ptr& server)
215 Utility::SetThreadName("API Listener");
221 Socket::Ptr client = server->Accept();
222 boost::thread thread(boost::bind(&ApiListener::NewClientHandler, this, client, String(), RoleServer));
224 } catch (const std::exception&) {
225 Log(LogCritical, "ApiListener", "Cannot accept new connection.");
231 * Creates a new JSON-RPC client and connects to the specified endpoint.
233 * @param endpoint The endpoint.
235 void ApiListener::AddConnection(const Endpoint::Ptr& endpoint)
238 ObjectLock olock(this);
240 boost::shared_ptr<SSL_CTX> sslContext = m_SSLContext;
243 Log(LogCritical, "ApiListener", "SSL context is required for AddConnection()");
248 String host = endpoint->GetHost();
249 String port = endpoint->GetPort();
251 Log(LogInformation, "JsonRpcConnection")
252 << "Reconnecting to API endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'";
254 TcpSocket::Ptr client = new TcpSocket();
257 endpoint->SetConnecting(true);
258 client->Connect(host, port);
259 NewClientHandler(client, endpoint->GetName(), RoleClient);
260 endpoint->SetConnecting(false);
261 } catch (const std::exception& ex) {
262 endpoint->SetConnecting(false);
265 std::ostringstream info;
266 info << "Cannot connect to host '" << host << "' on port '" << port << "'";
267 Log(LogCritical, "ApiListener", info.str());
268 Log(LogDebug, "ApiListener")
269 << info.str() << "\n" << DiagnosticInformation(ex);
273 void ApiListener::NewClientHandler(const Socket::Ptr& client, const String& hostname, ConnectionRole role)
276 NewClientHandlerInternal(client, hostname, role);
277 } catch (const std::exception& ex) {
278 Log(LogCritical, "ApiListener")
279 << "Exception while handling new API client connection: " << DiagnosticInformation(ex);
284 * Processes a new client connection.
286 * @param client The new client.
288 void ApiListener::NewClientHandlerInternal(const Socket::Ptr& client, const String& hostname, ConnectionRole role)
290 CONTEXT("Handling new API client connection");
292 TlsStream::Ptr tlsStream;
295 ObjectLock olock(this);
297 tlsStream = new TlsStream(client, hostname, role, m_SSLContext);
298 } catch (const std::exception&) {
299 Log(LogCritical, "ApiListener", "Cannot create TLS stream from client connection.");
305 tlsStream->Handshake();
306 } catch (const std::exception& ex) {
307 Log(LogCritical, "ApiListener", "Client TLS handshake failed");
311 boost::shared_ptr<X509> cert = tlsStream->GetPeerCertificate();
313 Endpoint::Ptr endpoint;
314 bool verify_ok = false;
318 identity = GetCertificateCN(cert);
319 } catch (const std::exception&) {
320 Log(LogCritical, "ApiListener")
321 << "Cannot get certificate common name from cert path: '" << GetCertPath() << "'.";
325 verify_ok = tlsStream->IsVerifyOK();
327 Log(LogInformation, "ApiListener")
328 << "New client connection for identity '" << identity << "'" << (verify_ok ? "" : " (unauthenticated)");
332 endpoint = Endpoint::GetByName(identity);
334 Log(LogInformation, "ApiListener")
335 << "New client connection (no client certificate)";
338 bool need_sync = false;
341 need_sync = !endpoint->IsConnected();
345 if (role == RoleClient) {
346 Dictionary::Ptr message = new Dictionary();
347 message->Set("jsonrpc", "2.0");
348 message->Set("method", "icinga::Hello");
349 message->Set("params", new Dictionary());
350 JsonRpc::SendMessage(tlsStream, message);
351 ctype = ClientJsonRpc;
353 tlsStream->WaitForData(5);
355 if (!tlsStream->IsDataAvailable()) {
356 Log(LogWarning, "ApiListener", "No data received on new API connection.");
361 tlsStream->Peek(&firstByte, 1, false);
363 if (firstByte >= '0' && firstByte <= '9')
364 ctype = ClientJsonRpc;
369 if (ctype == ClientJsonRpc) {
370 Log(LogInformation, "ApiListener", "New JSON-RPC client");
372 JsonRpcConnection::Ptr aclient = new JsonRpcConnection(identity, verify_ok, tlsStream, role);
376 endpoint->AddClient(aclient);
378 /* sync zone file config */
379 SendConfigUpdate(aclient);
380 /* sync runtime config */
381 SendRuntimeConfigObjects(aclient);
385 ObjectLock olock(endpoint);
387 endpoint->SetSyncing(true);
393 AddAnonymousClient(aclient);
395 Log(LogInformation, "ApiListener", "New HTTP client");
397 HttpServerConnection::Ptr aclient = new HttpServerConnection(identity, verify_ok, tlsStream);
399 AddHttpClient(aclient);
403 void ApiListener::ApiTimerHandler(void)
405 double now = Utility::GetTime();
407 std::vector<int> files;
408 Utility::Glob(GetApiDir() + "log/*", boost::bind(&ApiListener::LogGlobHandler, boost::ref(files), _1), GlobFile);
409 std::sort(files.begin(), files.end());
411 BOOST_FOREACH(int ts, files) {
414 BOOST_FOREACH(const Endpoint::Ptr& endpoint, ConfigType::GetObjectsByType<Endpoint>()) {
415 if (endpoint->GetName() == GetIdentity())
418 if (endpoint->GetLogDuration() >= 0 && ts < now - endpoint->GetLogDuration())
421 if (ts > endpoint->GetLocalLogPosition()) {
428 String path = GetApiDir() + "log/" + Convert::ToString(ts);
429 Log(LogNotice, "ApiListener")
430 << "Removing old log file: " << path;
431 (void)unlink(path.CStr());
435 Zone::Ptr my_zone = Zone::GetLocalZone();
437 BOOST_FOREACH(const Zone::Ptr& zone, ConfigType::GetObjectsByType<Zone>()) {
438 /* only connect to endpoints in a) the same zone b) our parent zone c) immediate child zones */
439 if (my_zone != zone && my_zone != zone->GetParent() && zone != my_zone->GetParent()) {
440 Log(LogDebug, "ApiListener")
441 << "Not connecting to Zone '" << zone->GetName()
442 << "' because it's not in the same zone, a parent or a child zone.";
446 BOOST_FOREACH(const Endpoint::Ptr& endpoint, zone->GetEndpoints()) {
447 /* don't connect to ourselves */
448 if (endpoint->GetName() == GetIdentity()) {
449 Log(LogDebug, "ApiListener")
450 << "Not connecting to Endpoint '" << endpoint->GetName() << "' because that's us.";
454 /* don't try to connect to endpoints which don't have a host and port */
455 if (endpoint->GetHost().IsEmpty() || endpoint->GetPort().IsEmpty()) {
456 Log(LogDebug, "ApiListener")
457 << "Not connecting to Endpoint '" << endpoint->GetName()
458 << "' because the host/port attributes are missing.";
462 /* don't try to connect if there's already a connection attempt */
463 if (endpoint->GetConnecting()) {
464 Log(LogDebug, "ApiListener")
465 << "Not connecting to Endpoint '" << endpoint->GetName()
466 << "' because we're already trying to connect to it.";
470 /* don't try to connect if we're already connected */
471 if (endpoint->IsConnected()) {
472 Log(LogDebug, "ApiListener")
473 << "Not connecting to Endpoint '" << endpoint->GetName()
474 << "' because we're already connected to it.";
478 boost::thread thread(boost::bind(&ApiListener::AddConnection, this, endpoint));
483 BOOST_FOREACH(const Endpoint::Ptr& endpoint, ConfigType::GetObjectsByType<Endpoint>()) {
484 if (!endpoint->IsConnected())
487 double ts = endpoint->GetRemoteLogPosition();
492 Dictionary::Ptr lparams = new Dictionary();
493 lparams->Set("log_position", ts);
495 Dictionary::Ptr lmessage = new Dictionary();
496 lmessage->Set("jsonrpc", "2.0");
497 lmessage->Set("method", "log::SetLogPosition");
498 lmessage->Set("params", lparams);
500 BOOST_FOREACH(const JsonRpcConnection::Ptr& client, endpoint->GetClients())
501 client->SendMessage(lmessage);
503 Log(LogNotice, "ApiListener")
504 << "Setting log position for identity '" << endpoint->GetName() << "': "
505 << Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", ts);
508 Endpoint::Ptr master = GetMaster();
511 Log(LogNotice, "ApiListener")
512 << "Current zone master: " << master->GetName();
514 std::vector<String> names;
515 BOOST_FOREACH(const Endpoint::Ptr& endpoint, ConfigType::GetObjectsByType<Endpoint>())
516 if (endpoint->IsConnected())
517 names.push_back(endpoint->GetName() + " (" + Convert::ToString(endpoint->GetClients().size()) + ")");
519 Log(LogNotice, "ApiListener")
520 << "Connected endpoints: " << Utility::NaturalJoin(names);
523 void ApiListener::RelayMessage(const MessageOrigin::Ptr& origin,
524 const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log)
526 m_RelayQueue.Enqueue(boost::bind(&ApiListener::SyncRelayMessage, this, origin, secobj, message, log), true);
529 void ApiListener::PersistMessage(const Dictionary::Ptr& message, const ConfigObject::Ptr& secobj)
531 double ts = message->Get("ts");
535 Dictionary::Ptr pmessage = new Dictionary();
536 pmessage->Set("timestamp", ts);
538 pmessage->Set("message", JsonEncode(message));
539 Dictionary::Ptr secname = new Dictionary();
540 secname->Set("type", secobj->GetType()->GetName());
541 secname->Set("name", secobj->GetName());
542 pmessage->Set("secobj", secname);
544 boost::mutex::scoped_lock lock(m_LogLock);
546 NetString::WriteStringToStream(m_LogFile, JsonEncode(pmessage));
548 SetLogMessageTimestamp(ts);
550 if (m_LogMessageCount > 50000) {
558 void ApiListener::SyncSendMessage(const Endpoint::Ptr& endpoint, const Dictionary::Ptr& message)
560 ObjectLock olock(endpoint);
562 if (!endpoint->GetSyncing()) {
563 Log(LogNotice, "ApiListener")
564 << "Sending message to '" << endpoint->GetName() << "'";
566 BOOST_FOREACH(const JsonRpcConnection::Ptr& client, endpoint->GetClients())
567 client->SendMessage(message);
572 void ApiListener::SyncRelayMessage(const MessageOrigin::Ptr& origin,
573 const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log)
575 double ts = Utility::GetTime();
576 message->Set("ts", ts);
578 Log(LogNotice, "ApiListener")
579 << "Relaying '" << message->Get("method") << "' message";
582 PersistMessage(message, secobj);
584 if (origin && origin->FromZone)
585 message->Set("originZone", origin->FromZone->GetName());
587 bool is_master = IsMaster();
588 Endpoint::Ptr master = GetMaster();
589 Zone::Ptr my_zone = Zone::GetLocalZone();
591 std::vector<Endpoint::Ptr> skippedEndpoints;
592 std::set<Zone::Ptr> finishedZones;
594 BOOST_FOREACH(const Endpoint::Ptr& endpoint, ConfigType::GetObjectsByType<Endpoint>()) {
595 /* don't relay messages to ourselves or disconnected endpoints */
596 if (endpoint->GetName() == GetIdentity() || !endpoint->IsConnected())
599 Zone::Ptr target_zone = endpoint->GetZone();
601 /* don't relay the message to the zone through more than one endpoint */
602 if (finishedZones.find(target_zone) != finishedZones.end()) {
603 skippedEndpoints.push_back(endpoint);
607 /* don't relay messages back to the endpoint which we got the message from */
608 if (origin && origin->FromClient && endpoint == origin->FromClient->GetEndpoint()) {
609 skippedEndpoints.push_back(endpoint);
613 /* don't relay messages back to the zone which we got the message from */
614 if (origin && origin->FromZone && target_zone == origin->FromZone) {
615 skippedEndpoints.push_back(endpoint);
619 /* only relay message to the master if we're not currently the master */
620 if (!is_master && master != endpoint) {
621 skippedEndpoints.push_back(endpoint);
625 /* only relay the message to a) the same zone, b) the parent zone and c) direct child zones */
626 if (target_zone != my_zone && target_zone != my_zone->GetParent() &&
627 secobj->GetZoneName() != target_zone->GetName()) {
628 skippedEndpoints.push_back(endpoint);
632 /* only relay messages to zones which have access to the object */
633 if (!target_zone->CanAccessObject(secobj))
636 finishedZones.insert(target_zone);
638 SyncSendMessage(endpoint, message);
641 BOOST_FOREACH(const Endpoint::Ptr& endpoint, skippedEndpoints)
642 endpoint->SetLocalLogPosition(ts);
645 String ApiListener::GetApiDir(void)
647 return Application::GetLocalStateDir() + "/lib/icinga2/api/";
650 /* must hold m_LogLock */
651 void ApiListener::OpenLogFile(void)
653 String path = GetApiDir() + "log/current";
655 std::fstream *fp = new std::fstream(path.CStr(), std::fstream::out | std::ofstream::app);
658 Log(LogWarning, "ApiListener")
659 << "Could not open spool file: " << path;
663 m_LogFile = new StdioStream(fp, true);
664 m_LogMessageCount = 0;
665 SetLogMessageTimestamp(Utility::GetTime());
668 /* must hold m_LogLock */
669 void ApiListener::CloseLogFile(void)
678 /* must hold m_LogLock */
679 void ApiListener::RotateLogFile(void)
681 double ts = GetLogMessageTimestamp();
684 ts = Utility::GetTime();
686 String oldpath = GetApiDir() + "log/current";
687 String newpath = GetApiDir() + "log/" + Convert::ToString(static_cast<int>(ts)+1);
688 (void) rename(oldpath.CStr(), newpath.CStr());
691 void ApiListener::LogGlobHandler(std::vector<int>& files, const String& file)
693 String name = Utility::BaseName(file);
695 if (name == "current")
701 ts = Convert::ToLong(name);
702 } catch (const std::exception&) {
709 void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)
711 Endpoint::Ptr endpoint = client->GetEndpoint();
713 CONTEXT("Replaying log for Endpoint '" + endpoint->GetName() + "'");
716 double peer_ts = endpoint->GetLocalLogPosition();
717 double logpos_ts = peer_ts;
718 bool last_sync = false;
720 Endpoint::Ptr target_endpoint = client->GetEndpoint();
721 ASSERT(target_endpoint);
723 Zone::Ptr target_zone = target_endpoint->GetZone();
729 boost::mutex::scoped_lock lock(m_LogLock);
734 if (count == -1 || count > 50000) {
743 std::vector<int> files;
744 Utility::Glob(GetApiDir() + "log/*", boost::bind(&ApiListener::LogGlobHandler, boost::ref(files), _1), GlobFile);
745 std::sort(files.begin(), files.end());
747 BOOST_FOREACH(int ts, files) {
748 String path = GetApiDir() + "log/" + Convert::ToString(ts);
753 Log(LogNotice, "ApiListener")
754 << "Replaying log: " << path;
756 std::fstream *fp = new std::fstream(path.CStr(), std::fstream::in | std::fstream::binary);
757 StdioStream::Ptr logStream = new StdioStream(fp, true);
760 StreamReadContext src;
762 Dictionary::Ptr pmessage;
765 StreamReadStatus srs = NetString::ReadStringFromStream(logStream, &message, src);
767 if (srs == StatusEof)
770 if (srs != StatusNewItem)
773 pmessage = JsonDecode(message);
774 } catch (const std::exception&) {
775 Log(LogWarning, "ApiListener")
776 << "Unexpected end-of-file for cluster log: " << path;
778 /* Log files may be incomplete or corrupted. This is perfectly OK. */
782 if (pmessage->Get("timestamp") <= peer_ts)
785 Dictionary::Ptr secname = pmessage->Get("secobj");
788 ConfigType::Ptr dtype = ConfigType::GetByName(secname->Get("type"));
793 ConfigObject::Ptr secobj = dtype->GetObject(secname->Get("name"));
798 if (!target_zone->CanAccessObject(secobj))
802 NetString::WriteStringToStream(client->GetStream(), pmessage->Get("message"));
805 peer_ts = pmessage->Get("timestamp");
807 if (ts > logpos_ts + 10) {
810 Dictionary::Ptr lparams = new Dictionary();
811 lparams->Set("log_position", logpos_ts);
813 Dictionary::Ptr lmessage = new Dictionary();
814 lmessage->Set("jsonrpc", "2.0");
815 lmessage->Set("method", "log::SetLogPosition");
816 lmessage->Set("params", lparams);
818 JsonRpc::SendMessage(client->GetStream(), lmessage);
826 Log(LogInformation, "ApiListener")
827 << "Replayed " << count << " messages.";
830 Log(LogNotice, "ApiListener")
831 << "Replayed " << count << " messages.";
835 ObjectLock olock2(endpoint);
836 endpoint->SetSyncing(false);
846 void ApiListener::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
848 std::pair<Dictionary::Ptr, Dictionary::Ptr> stats;
850 ApiListener::Ptr listener = ApiListener::GetInstance();
855 stats = listener->GetStatus();
857 ObjectLock olock(stats.second);
858 BOOST_FOREACH(const Dictionary::Pair& kv, stats.second)
859 perfdata->Add("'api_" + kv.first + "'=" + Convert::ToString(kv.second));
861 status->Set("api", stats.first);
864 std::pair<Dictionary::Ptr, Dictionary::Ptr> ApiListener::GetStatus(void)
866 Dictionary::Ptr status = new Dictionary();
867 Dictionary::Ptr perfdata = new Dictionary();
870 status->Set("identity", GetIdentity());
872 double allEndpoints = 0;
873 Array::Ptr allNotConnectedEndpoints = new Array();
874 Array::Ptr allConnectedEndpoints = new Array();
876 Zone::Ptr my_zone = Zone::GetLocalZone();
878 Dictionary::Ptr connectedZones = new Dictionary();
880 BOOST_FOREACH(const Zone::Ptr& zone, ConfigType::GetObjectsByType<Zone>()) {
881 /* only check endpoints in a) the same zone b) our parent zone c) immediate child zones */
882 if (my_zone != zone && my_zone != zone->GetParent() && zone != my_zone->GetParent()) {
883 Log(LogDebug, "ApiListener")
884 << "Not checking connection to Zone '" << zone->GetName() << "' because it's not in the same zone, a parent or a child zone.";
888 bool zoneConnected = false;
889 int countZoneEndpoints = 0;
892 Array::Ptr zoneEndpoints = new Array();
894 BOOST_FOREACH(const Endpoint::Ptr& endpoint, zone->GetEndpoints()) {
895 zoneEndpoints->Add(endpoint->GetName());
897 if (endpoint->GetName() == GetIdentity())
900 double eplag = CalculateZoneLag(endpoint);
902 if (eplag > 0 && eplag > zoneLag)
906 countZoneEndpoints++;
908 if (!endpoint->IsConnected()) {
909 allNotConnectedEndpoints->Add(endpoint->GetName());
911 allConnectedEndpoints->Add(endpoint->GetName());
912 zoneConnected = true;
916 /* if there's only one endpoint inside the zone, we're not connected - that's us, fake it */
917 if (zone->GetEndpoints().size() == 1 && countZoneEndpoints == 0)
918 zoneConnected = true;
920 Dictionary::Ptr zoneStats = new Dictionary();
921 zoneStats->Set("connected", zoneConnected);
922 zoneStats->Set("client_log_lag", zoneLag);
923 zoneStats->Set("endpoints", zoneEndpoints);
925 String parentZoneName;
926 Zone::Ptr parentZone = zone->GetParent();
928 parentZoneName = parentZone->GetName();
930 zoneStats->Set("parent_zone", parentZoneName);
932 connectedZones->Set(zone->GetName(), zoneStats);
935 status->Set("num_endpoints", allEndpoints);
936 status->Set("num_conn_endpoints", allConnectedEndpoints->GetLength());
937 status->Set("num_not_conn_endpoints", allNotConnectedEndpoints->GetLength());
938 status->Set("conn_endpoints", allConnectedEndpoints);
939 status->Set("not_conn_endpoints", allNotConnectedEndpoints);
941 status->Set("zones", connectedZones);
943 perfdata->Set("num_endpoints", allEndpoints);
944 perfdata->Set("num_conn_endpoints", Convert::ToDouble(allConnectedEndpoints->GetLength()));
945 perfdata->Set("num_not_conn_endpoints", Convert::ToDouble(allNotConnectedEndpoints->GetLength()));
947 return std::make_pair(status, perfdata);
950 double ApiListener::CalculateZoneLag(const Endpoint::Ptr& endpoint)
952 double remoteLogPosition = endpoint->GetRemoteLogPosition();
953 double eplag = Utility::GetTime() - remoteLogPosition;
955 if ((endpoint->GetSyncing() || !endpoint->IsConnected()) && remoteLogPosition != 0)
961 void ApiListener::AddAnonymousClient(const JsonRpcConnection::Ptr& aclient)
963 ObjectLock olock(this);
964 m_AnonymousClients.insert(aclient);
967 void ApiListener::RemoveAnonymousClient(const JsonRpcConnection::Ptr& aclient)
969 ObjectLock olock(this);
970 m_AnonymousClients.erase(aclient);
973 std::set<JsonRpcConnection::Ptr> ApiListener::GetAnonymousClients(void) const
975 ObjectLock olock(this);
976 return m_AnonymousClients;
979 void ApiListener::AddHttpClient(const HttpServerConnection::Ptr& aclient)
981 ObjectLock olock(this);
982 m_HttpClients.insert(aclient);
985 void ApiListener::RemoveHttpClient(const HttpServerConnection::Ptr& aclient)
987 ObjectLock olock(this);
988 m_HttpClients.erase(aclient);
991 std::set<HttpServerConnection::Ptr> ApiListener::GetHttpClients(void) const
993 ObjectLock olock(this);
994 return m_HttpClients;
997 Value ApiListener::HelloAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params)