]> granicus.if.org Git - icinga2/blobdiff - lib/remote/apilistener.cpp
Don't throw an exception when replaying the current replay log file
[icinga2] / lib / remote / apilistener.cpp
index aa2f8d401f40edac1b32fc4e6c0b3cc4d6ccde49..f1288dcafd73bf4c44f3a04557e19f47681ffbc3 100644 (file)
@@ -1,6 +1,6 @@
 /******************************************************************************
  * Icinga 2                                                                   *
- * Copyright (C) 2012-2014 Icinga Development Team (http://www.icinga.org)    *
+ * Copyright (C) 2012-2015 Icinga Development Team (http://www.icinga.org)    *
  *                                                                            *
  * This program is free software; you can redistribute it and/or              *
  * modify it under the terms of the GNU General Public License                *
  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
  ******************************************************************************/
 
-#include "remote/apilistener.h"
-#include "remote/apiclient.h"
-#include "remote/endpoint.h"
-#include "remote/jsonrpc.h"
-#include "base/convert.h"
-#include "base/netstring.h"
-#include "base/dynamictype.h"
-#include "base/logger_fwd.h"
-#include "base/objectlock.h"
-#include "base/stdiostream.h"
-#include "base/networkstream.h"
-#include "base/zlibstream.h"
-#include "base/application.h"
-#include "base/context.h"
-#include "base/statsfunction.h"
+#include "remote/apilistener.hpp"
+#include "remote/apilistener.tcpp"
+#include "remote/jsonrpcconnection.hpp"
+#include "remote/endpoint.hpp"
+#include "remote/jsonrpc.hpp"
+#include "remote/apifunction.hpp"
+#include "base/convert.hpp"
+#include "base/netstring.hpp"
+#include "base/json.hpp"
+#include "base/configtype.hpp"
+#include "base/logger.hpp"
+#include "base/objectlock.hpp"
+#include "base/stdiostream.hpp"
+#include "base/application.hpp"
+#include "base/context.hpp"
+#include "base/statsfunction.hpp"
+#include "base/exception.hpp"
 #include <fstream>
 
 using namespace icinga;
@@ -40,22 +42,56 @@ REGISTER_TYPE(ApiListener);
 
 boost::signals2::signal<void(bool)> ApiListener::OnMasterChanged;
 
-REGISTER_STATSFUNCTION(ApiListenerStats, &ApiListener::StatsFunc);
+REGISTER_STATSFUNCTION(ApiListener, &ApiListener::StatsFunc);
+
+REGISTER_APIFUNCTION(Hello, icinga, &ApiListener::HelloAPIHandler);
+
+ApiListener::ApiListener(void)
+       : m_LogMessageCount(0)
+{ }
 
 void ApiListener::OnConfigLoaded(void)
 {
        /* set up SSL context */
-       shared_ptr<X509> cert = GetX509Certificate(GetCertPath());
-       SetIdentity(GetCertificateCN(cert));
-       Log(LogInformation, "remote", "My API identity: " + GetIdentity());
+       boost::shared_ptr<X509> cert;
+       try {
+               cert = GetX509Certificate(GetCertPath());
+       } catch (const std::exception&) {
+               BOOST_THROW_EXCEPTION(ScriptError("Cannot get certificate from cert path: '"
+                   + GetCertPath() + "'.", GetDebugInfo()));
+       }
 
-       m_SSLContext = MakeSSLContext(GetCertPath(), GetKeyPath(), GetCaPath());
+       try {
+               SetIdentity(GetCertificateCN(cert));
+       } catch (const std::exception&) {
+               BOOST_THROW_EXCEPTION(ScriptError("Cannot get certificate common name from cert path: '"
+                   + GetCertPath() + "'.", GetDebugInfo()));
+       }
 
-       if (!GetCrlPath().IsEmpty())
-               AddCRLToSSLContext(m_SSLContext, GetCrlPath());
+       Log(LogInformation, "ApiListener")
+           << "My API identity: " << GetIdentity();
 
+       try {
+               m_SSLContext = MakeSSLContext(GetCertPath(), GetKeyPath(), GetCaPath());
+       } catch (const std::exception&) {
+               BOOST_THROW_EXCEPTION(ScriptError("Cannot make SSL context for cert path: '"
+                   + GetCertPath() + "' key path: '" + GetKeyPath() + "' ca path: '" + GetCaPath() + "'.", GetDebugInfo()));
+       }
+
+       if (!GetCrlPath().IsEmpty()) {
+               try {
+                       AddCRLToSSLContext(m_SSLContext, GetCrlPath());
+               } catch (const std::exception&) {
+                       BOOST_THROW_EXCEPTION(ScriptError("Cannot add certificate revocation list to SSL context for crl path: '"
+                           + GetCrlPath() + "'.", GetDebugInfo()));
+               }
+       }
+}
+
+void ApiListener::OnAllConfigLoaded(void)
+{
        if (!Endpoint::GetByName(GetIdentity()))
-               BOOST_THROW_EXCEPTION(std::runtime_error("Endpoint object for '" + GetIdentity() + "' is missing."));
+               BOOST_THROW_EXCEPTION(ScriptError("Endpoint object for '" + GetIdentity() + "' is missing.", GetDebugInfo()));
 }
 
 /**
@@ -63,10 +99,15 @@ void ApiListener::OnConfigLoaded(void)
  */
 void ApiListener::Start(void)
 {
-       if (std::distance(DynamicType::GetObjects<ApiListener>().first, DynamicType::GetObjects<ApiListener>().second) > 1)
-               BOOST_THROW_EXCEPTION(std::runtime_error("Only one ApiListener object is allowed."));
+       SyncZoneDirs();
+
+       if (std::distance(ConfigType::GetObjectsByType<ApiListener>().first,
+           ConfigType::GetObjectsByType<ApiListener>().second) > 1) {
+               Log(LogCritical, "ApiListener", "Only one ApiListener object is allowed.");
+               return;
+       }
 
-       DynamicObject::Start();
+       ObjectImpl<ApiListener>::Start();
 
        {
                boost::mutex::scoped_lock(m_LogLock);
@@ -75,9 +116,13 @@ void ApiListener::Start(void)
        }
 
        /* create the primary JSON-RPC listener */
-       AddListener(GetBindPort());
+       if (!AddListener(GetBindHost(), GetBindPort())) {
+               Log(LogCritical, "ApiListener")
+                    << "Cannot add listener on host '" << GetBindHost() << "' for port '" << GetBindPort() << "'.";
+               Application::Exit(EXIT_FAILURE);
+       }
 
-       m_Timer = make_shared<Timer>();
+       m_Timer = new Timer();
        m_Timer->OnTimerExpired.connect(boost::bind(&ApiListener::ApiTimerHandler, this));
        m_Timer->SetInterval(5);
        m_Timer->Start();
@@ -88,13 +133,13 @@ void ApiListener::Start(void)
 
 ApiListener::Ptr ApiListener::GetInstance(void)
 {
-       BOOST_FOREACH(const ApiListener::Ptr& listener, DynamicType::GetObjects<ApiListener>())
+       BOOST_FOREACH(const ApiListener::Ptr& listener, ConfigType::GetObjectsByType<ApiListener>())
                return listener;
 
        return ApiListener::Ptr();
 }
 
-shared_ptr<SSL_CTX> ApiListener::GetSSLContext(void) const
+boost::shared_ptr<SSL_CTX> ApiListener::GetSSLContext(void) const
 {
        return m_SSLContext;
 }
@@ -102,6 +147,10 @@ shared_ptr<SSL_CTX> ApiListener::GetSSLContext(void) const
 Endpoint::Ptr ApiListener::GetMaster(void) const
 {
        Zone::Ptr zone = Zone::GetLocalZone();
+
+       if (!zone)
+               return Endpoint::Ptr();
+
        std::vector<String> names;
 
        BOOST_FOREACH(const Endpoint::Ptr& endpoint, zone->GetEndpoints())
@@ -115,34 +164,50 @@ Endpoint::Ptr ApiListener::GetMaster(void) const
 
 bool ApiListener::IsMaster(void) const
 {
-       return GetMaster()->GetName() == GetIdentity();
+       Endpoint::Ptr master = GetMaster();
+
+       if (!master)
+               return false;
+
+       return master->GetName() == GetIdentity();
 }
 
 /**
  * Creates a new JSON-RPC listener on the specified port.
  *
+ * @param node The host the listener should be bound to.
  * @param service The port to listen on.
  */
-void ApiListener::AddListener(const String& service)
+bool ApiListener::AddListener(const String& node, const String& service)
 {
        ObjectLock olock(this);
 
-       shared_ptr<SSL_CTX> sslContext = m_SSLContext;
+       boost::shared_ptr<SSL_CTX> sslContext = m_SSLContext;
 
-       if (!sslContext)
-               BOOST_THROW_EXCEPTION(std::logic_error("SSL context is required for AddListener()"));
+       if (!sslContext) {
+               Log(LogCritical, "ApiListener", "SSL context is required for AddListener()");
+               return false;
+       }
 
-       std::ostringstream s;
-       s << "Adding new listener: port " << service;
-       Log(LogInformation, "agent", s.str());
+       Log(LogInformation, "ApiListener")
+           << "Adding new listener on port '" << service << "'";
 
-       TcpSocket::Ptr server = make_shared<TcpSocket>();
-       server->Bind(service, AF_INET6);
+       TcpSocket::Ptr server = new TcpSocket();
+
+       try {
+               server->Bind(node, service, AF_UNSPEC);
+       } catch (const std::exception&) {
+               Log(LogCritical, "ApiListener")
+                   << "Cannot bind TCP socket for host '" << node << "' on port '" << service << "'.";
+               return false;
+       }
 
        boost::thread thread(boost::bind(&ApiListener::ListenerThreadProc, this, server));
        thread.detach();
 
        m_Servers.insert(server);
+
+       return true;
 }
 
 void ApiListener::ListenerThreadProc(const Socket::Ptr& server)
@@ -152,32 +217,67 @@ void ApiListener::ListenerThreadProc(const Socket::Ptr& server)
        server->Listen();
 
        for (;;) {
-               Socket::Ptr client = server->Accept();
-
-               Utility::QueueAsyncCallback(boost::bind(&ApiListener::NewClientHandler, this, client, RoleServer));
+               try {
+                       Socket::Ptr client = server->Accept();
+                       boost::thread thread(boost::bind(&ApiListener::NewClientHandler, this, client, String(), RoleServer));
+                       thread.detach();
+               } catch (const std::exception&) {
+                       Log(LogCritical, "ApiListener", "Cannot accept new connection.");
+               }
        }
 }
 
 /**
- * Creates a new JSON-RPC client and connects to the specified host and port.
+ * Creates a new JSON-RPC client and connects to the specified endpoint.
  *
- * @param node The remote host.
- * @param service The remote port.
+ * @param endpoint The endpoint.
  */
-void ApiListener::AddConnection(const String& node, const String& service) {
+void ApiListener::AddConnection(const Endpoint::Ptr& endpoint)
+{
        {
                ObjectLock olock(this);
 
-               shared_ptr<SSL_CTX> sslContext = m_SSLContext;
+               boost::shared_ptr<SSL_CTX> sslContext = m_SSLContext;
 
-               if (!sslContext)
-                       BOOST_THROW_EXCEPTION(std::logic_error("SSL context is required for AddConnection()"));
+               if (!sslContext) {
+                       Log(LogCritical, "ApiListener", "SSL context is required for AddConnection()");
+                       return;
+               }
        }
 
-       TcpSocket::Ptr client = make_shared<TcpSocket>();
+       String host = endpoint->GetHost();
+       String port = endpoint->GetPort();
+
+       Log(LogInformation, "JsonRpcConnection")
+           << "Reconnecting to API endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'";
+
+       TcpSocket::Ptr client = new TcpSocket();
+
+       try {
+               endpoint->SetConnecting(true);
+               client->Connect(host, port);
+               NewClientHandler(client, endpoint->GetName(), RoleClient);
+               endpoint->SetConnecting(false);
+       } catch (const std::exception& ex) {
+               endpoint->SetConnecting(false);
+               client->Close();
+
+               std::ostringstream info;
+               info << "Cannot connect to host '" << host << "' on port '" << port << "'";
+               Log(LogCritical, "ApiListener", info.str());
+               Log(LogDebug, "ApiListener")
+                   << info.str() << "\n" << DiagnosticInformation(ex);
+       }
+}
 
-       client->Connect(node, service);
-       Utility::QueueAsyncCallback(boost::bind(&ApiListener::NewClientHandler, this, client, RoleClient));
+void ApiListener::NewClientHandler(const Socket::Ptr& client, const String& hostname, ConnectionRole role)
+{
+       try {
+               NewClientHandlerInternal(client, hostname, role);
+       } catch (const std::exception& ex) {
+               Log(LogCritical, "ApiListener")
+                   << "Exception while handling new API client connection: " << DiagnosticInformation(ex);
+       }
 }
 
 /**
@@ -185,7 +285,7 @@ void ApiListener::AddConnection(const String& node, const String& service) {
  *
  * @param client The new client.
  */
-void ApiListener::NewClientHandler(const Socket::Ptr& client, ConnectionRole role)
+void ApiListener::NewClientHandlerInternal(const Socket::Ptr& client, const String& hostname, ConnectionRole role)
 {
        CONTEXT("Handling new API client connection");
 
@@ -193,40 +293,111 @@ void ApiListener::NewClientHandler(const Socket::Ptr& client, ConnectionRole rol
 
        {
                ObjectLock olock(this);
-               tlsStream = make_shared<TlsStream>(client, role, m_SSLContext);
+               try {
+                       tlsStream = new TlsStream(client, hostname, role, m_SSLContext);
+               } catch (const std::exception&) {
+                       Log(LogCritical, "ApiListener", "Cannot create TLS stream from client connection.");
+                       return;
+               }
+       }
+
+       try {
+               tlsStream->Handshake();
+       } catch (const std::exception& ex) {
+               Log(LogCritical, "ApiListener", "Client TLS handshake failed");
+               return;
        }
 
-       tlsStream->Handshake();
+       boost::shared_ptr<X509> cert = tlsStream->GetPeerCertificate();
+       String identity;
+       Endpoint::Ptr endpoint;
+       bool verify_ok = false;
+
+       if (cert) {
+               try {
+                       identity = GetCertificateCN(cert);
+               } catch (const std::exception&) {
+                       Log(LogCritical, "ApiListener")
+                           << "Cannot get certificate common name from cert path: '" << GetCertPath() << "'.";
+                       return;
+               }
+
+               verify_ok = tlsStream->IsVerifyOK();
 
-       shared_ptr<X509> cert = tlsStream->GetPeerCertificate();
-       String identity = GetCertificateCN(cert);
+               Log(LogInformation, "ApiListener")
+                   << "New client connection for identity '" << identity << "'" << (verify_ok ? "" : " (unauthenticated)");
 
-       Log(LogInformation, "remote", "New client connection for identity '" + identity + "'");
 
-       Endpoint::Ptr endpoint = Endpoint::GetByName(identity);
+               if (verify_ok)
+                       endpoint = Endpoint::GetByName(identity);
+       } else {
+               Log(LogInformation, "ApiListener")
+                   << "New client connection (no client certificate)";
+       }
 
-       bool need_sync;
+       bool need_sync = false;
 
        if (endpoint)
                need_sync = !endpoint->IsConnected();
 
-       ApiClient::Ptr aclient = make_shared<ApiClient>(identity, tlsStream, role);
-       aclient->Start();
+       ClientType ctype;
+
+       if (role == RoleClient) {
+               Dictionary::Ptr message = new Dictionary();
+               message->Set("jsonrpc", "2.0");
+               message->Set("method", "icinga::Hello");
+               message->Set("params", new Dictionary());
+               JsonRpc::SendMessage(tlsStream, message);
+               ctype = ClientJsonRpc;
+       } else {
+               tlsStream->WaitForData(5);
+
+               if (!tlsStream->IsDataAvailable()) {
+                       Log(LogWarning, "ApiListener", "No data received on new API connection.");
+                       return;
+               }
 
-       if (endpoint) {
-               if (need_sync) {
-                       {
-                               ObjectLock olock(endpoint);
+               char firstByte;
+               tlsStream->Peek(&firstByte, 1, false);
 
-                               endpoint->SetSyncing(true);
-                       }
+               if (firstByte >= '0' && firstByte <= '9')
+                       ctype = ClientJsonRpc;
+               else
+                       ctype = ClientHttp;
+       }
 
-                       ReplayLog(aclient);
-               }
+       if (ctype == ClientJsonRpc) {
+               Log(LogInformation, "ApiListener", "New JSON-RPC client");
+
+               JsonRpcConnection::Ptr aclient = new JsonRpcConnection(identity, verify_ok, tlsStream, role);
+               aclient->Start();
+
+               if (endpoint) {
+                       endpoint->AddClient(aclient);
+
+                       /* sync zone file config */
+                       SendConfigUpdate(aclient);
+                       /* sync runtime config */
+                       SendRuntimeConfigObjects(aclient);
+
+                       if (need_sync) {
+                               {
+                                       ObjectLock olock(endpoint);
+
+                                       endpoint->SetSyncing(true);
+                               }
 
-               endpoint->AddClient(aclient);
-       } else
-               AddAnonymousClient(aclient);
+                               ReplayLog(aclient);
+                       }
+               } else
+                       AddAnonymousClient(aclient);
+       } else {
+               Log(LogInformation, "ApiListener", "New HTTP client");
+
+               HttpServerConnection::Ptr aclient = new HttpServerConnection(identity, verify_ok, tlsStream);
+               aclient->Start();
+               AddHttpClient(aclient);
+       }
 }
 
 void ApiListener::ApiTimerHandler(void)
@@ -240,7 +411,7 @@ void ApiListener::ApiTimerHandler(void)
        BOOST_FOREACH(int ts, files) {
                bool need = false;
 
-               BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
+               BOOST_FOREACH(const Endpoint::Ptr& endpoint, ConfigType::GetObjectsByType<Endpoint>()) {
                        if (endpoint->GetName() == GetIdentity())
                                continue;
 
@@ -255,47 +426,61 @@ void ApiListener::ApiTimerHandler(void)
 
                if (!need) {
                        String path = GetApiDir() + "log/" + Convert::ToString(ts);
-                       Log(LogInformation, "remote", "Removing old log file: " + path);
+                       Log(LogNotice, "ApiListener")
+                           << "Removing old log file: " << path;
                        (void)unlink(path.CStr());
                }
        }
 
-       if (IsMaster()) {
-               Zone::Ptr my_zone = Zone::GetLocalZone();
-
-               BOOST_FOREACH(const Zone::Ptr& zone, DynamicType::GetObjects<Zone>()) {
-                       /* only connect to endpoints in a) the same zone b) our parent zone c) immediate child zones */
-                       if (my_zone != zone && my_zone != zone->GetParent() && zone != my_zone->GetParent())
-                               continue;
+       Zone::Ptr my_zone = Zone::GetLocalZone();
 
-                       bool connected = false;
+       BOOST_FOREACH(const Zone::Ptr& zone, ConfigType::GetObjectsByType<Zone>()) {
+               /* only connect to endpoints in a) the same zone b) our parent zone c) immediate child zones */
+               if (my_zone != zone && my_zone != zone->GetParent() && zone != my_zone->GetParent()) {
+                       Log(LogDebug, "ApiListener")
+                           << "Not connecting to Zone '" << zone->GetName()
+                           << "' because it's not in the same zone, a parent or a child zone.";
+                       continue;
+               }
 
-                       BOOST_FOREACH(const Endpoint::Ptr& endpoint, zone->GetEndpoints()) {
-                               if (endpoint->IsConnected()) {
-                                       connected = true;
-                                       break;
-                               }
+               BOOST_FOREACH(const Endpoint::Ptr& endpoint, zone->GetEndpoints()) {
+                       /* don't connect to ourselves */
+                       if (endpoint->GetName() == GetIdentity()) {
+                               Log(LogDebug, "ApiListener")
+                                   << "Not connecting to Endpoint '" << endpoint->GetName() << "' because that's us.";
+                               continue;
                        }
 
-                       /* don't connect to an endpoint if we already have a connection to the zone */
-                       if (connected)
+                       /* don't try to connect to endpoints which don't have a host and port */
+                       if (endpoint->GetHost().IsEmpty() || endpoint->GetPort().IsEmpty()) {
+                               Log(LogDebug, "ApiListener")
+                                   << "Not connecting to Endpoint '" << endpoint->GetName()
+                                   << "' because the host/port attributes are missing.";
                                continue;
+                       }
 
-                       BOOST_FOREACH(const Endpoint::Ptr& endpoint, zone->GetEndpoints()) {
-                               /* don't connect to ourselves */
-                               if (endpoint->GetName() == GetIdentity())
-                                       continue;
-
-                               /* don't try to connect to endpoints which don't have a host and port */
-                               if (endpoint->GetHost().IsEmpty() || endpoint->GetPort().IsEmpty())
-                                       continue;
+                       /* don't try to connect if there's already a connection attempt */
+                       if (endpoint->GetConnecting()) {
+                               Log(LogDebug, "ApiListener")
+                                   << "Not connecting to Endpoint '" << endpoint->GetName()
+                                   << "' because we're already trying to connect to it.";
+                               continue;
+                       }
 
-                               AddConnection(endpoint->GetHost(), endpoint->GetPort());
+                       /* don't try to connect if we're already connected */
+                       if (endpoint->IsConnected()) {
+                               Log(LogDebug, "ApiListener")
+                                   << "Not connecting to Endpoint '" << endpoint->GetName()
+                                   << "' because we're already connected to it.";
+                               continue;
                        }
+
+                       boost::thread thread(boost::bind(&ApiListener::AddConnection, this, endpoint));
+                       thread.detach();
                }
        }
 
-       BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
+       BOOST_FOREACH(const Endpoint::Ptr& endpoint, ConfigType::GetObjectsByType<Endpoint>()) {
                if (!endpoint->IsConnected())
                        continue;
 
@@ -304,50 +489,61 @@ void ApiListener::ApiTimerHandler(void)
                if (ts == 0)
                        continue;
 
-               Dictionary::Ptr lparams = make_shared<Dictionary>();
+               Dictionary::Ptr lparams = new Dictionary();
                lparams->Set("log_position", ts);
 
-               Dictionary::Ptr lmessage = make_shared<Dictionary>();
+               Dictionary::Ptr lmessage = new Dictionary();
                lmessage->Set("jsonrpc", "2.0");
                lmessage->Set("method", "log::SetLogPosition");
                lmessage->Set("params", lparams);
 
-               BOOST_FOREACH(const ApiClient::Ptr& client, endpoint->GetClients())
+               BOOST_FOREACH(const JsonRpcConnection::Ptr& client, endpoint->GetClients())
                        client->SendMessage(lmessage);
 
-               Log(LogInformation, "remote", "Setting log position for identity '" + endpoint->GetName() + "': " +
-                       Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", ts));
+               Log(LogNotice, "ApiListener")
+                   << "Setting log position for identity '" << endpoint->GetName() << "': "
+                   << Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", ts);
        }
 
-       Log(LogInformation, "remote", "Current master: " + GetMaster()->GetName());
+       Endpoint::Ptr master = GetMaster();
+
+       if (master)
+               Log(LogNotice, "ApiListener")
+                   << "Current zone master: " << master->GetName();
 
        std::vector<String> names;
-       BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>())
+       BOOST_FOREACH(const Endpoint::Ptr& endpoint, ConfigType::GetObjectsByType<Endpoint>())
                if (endpoint->IsConnected())
                        names.push_back(endpoint->GetName() + " (" + Convert::ToString(endpoint->GetClients().size()) + ")");
 
-       Log(LogInformation, "remote", "Connected endpoints: " + Utility::NaturalJoin(names));
+       Log(LogNotice, "ApiListener")
+           << "Connected endpoints: " << Utility::NaturalJoin(names);
 }
 
-void ApiListener::RelayMessage(const MessageOrigin& origin, const DynamicObject::Ptr& secobj, const Dictionary::Ptr& message, bool log)
+void ApiListener::RelayMessage(const MessageOrigin::Ptr& origin,
+    const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log)
 {
-       m_RelayQueue.Enqueue(boost::bind(&ApiListener::SyncRelayMessage, this, origin, secobj, message, log));
+       m_RelayQueue.Enqueue(boost::bind(&ApiListener::SyncRelayMessage, this, origin, secobj, message, log), true);
 }
 
-void ApiListener::PersistMessage(const Dictionary::Ptr& message)
+void ApiListener::PersistMessage(const Dictionary::Ptr& message, const ConfigObject::Ptr& secobj)
 {
        double ts = message->Get("ts");
 
        ASSERT(ts != 0);
 
-       Dictionary::Ptr pmessage = make_shared<Dictionary>();
+       Dictionary::Ptr pmessage = new Dictionary();
        pmessage->Set("timestamp", ts);
 
-       pmessage->Set("message", JsonSerialize(message));
+       pmessage->Set("message", JsonEncode(message));
+       Dictionary::Ptr secname = new Dictionary();
+       secname->Set("type", secobj->GetType()->GetName());
+       secname->Set("name", secobj->GetName());
+       pmessage->Set("secobj", secname);
 
        boost::mutex::scoped_lock lock(m_LogLock);
        if (m_LogFile) {
-               NetString::WriteStringToStream(m_LogFile, JsonSerialize(pmessage));
+               NetString::WriteStringToStream(m_LogFile, JsonEncode(pmessage));
                m_LogMessageCount++;
                SetLogMessageTimestamp(ts);
 
@@ -359,18 +555,34 @@ void ApiListener::PersistMessage(const Dictionary::Ptr& message)
        }
 }
 
-void ApiListener::SyncRelayMessage(const MessageOrigin& origin, const DynamicObject::Ptr& secobj, const Dictionary::Ptr& message, bool log)
+void ApiListener::SyncSendMessage(const Endpoint::Ptr& endpoint, const Dictionary::Ptr& message)
+{
+       ObjectLock olock(endpoint);
+
+       if (!endpoint->GetSyncing()) {
+               Log(LogNotice, "ApiListener")
+                   << "Sending message to '" << endpoint->GetName() << "'";
+
+               BOOST_FOREACH(const JsonRpcConnection::Ptr& client, endpoint->GetClients())
+                       client->SendMessage(message);
+       }
+}
+
+
+void ApiListener::SyncRelayMessage(const MessageOrigin::Ptr& origin,
+    const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log)
 {
        double ts = Utility::GetTime();
        message->Set("ts", ts);
 
-       Log(LogDebug, "remote", "Relaying '" + message->Get("method") + "' message");
+       Log(LogNotice, "ApiListener")
+           << "Relaying '" << message->Get("method") << "' message";
 
        if (log)
-               m_LogQueue.Enqueue(boost::bind(&ApiListener::PersistMessage, this, message));
+               PersistMessage(message, secobj);
 
-       if (origin.FromZone)
-               message->Set("originZone", origin.FromZone->GetName());
+       if (origin && origin->FromZone)
+               message->Set("originZone", origin->FromZone->GetName());
 
        bool is_master = IsMaster();
        Endpoint::Ptr master = GetMaster();
@@ -379,7 +591,7 @@ void ApiListener::SyncRelayMessage(const MessageOrigin& origin, const DynamicObj
        std::vector<Endpoint::Ptr> skippedEndpoints;
        std::set<Zone::Ptr> finishedZones;
 
-       BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
+       BOOST_FOREACH(const Endpoint::Ptr& endpoint, ConfigType::GetObjectsByType<Endpoint>()) {
                /* don't relay messages to ourselves or disconnected endpoints */
                if (endpoint->GetName() == GetIdentity() || !endpoint->IsConnected())
                        continue;
@@ -393,13 +605,13 @@ void ApiListener::SyncRelayMessage(const MessageOrigin& origin, const DynamicObj
                }
 
                /* don't relay messages back to the endpoint which we got the message from */
-               if (origin.FromClient && endpoint == origin.FromClient->GetEndpoint()) {
+               if (origin && origin->FromClient && endpoint == origin->FromClient->GetEndpoint()) {
                        skippedEndpoints.push_back(endpoint);
                        continue;
                }
 
                /* don't relay messages back to the zone which we got the message from */
-               if (origin.FromZone && target_zone == origin.FromZone) {
+               if (origin && origin->FromZone && target_zone == origin->FromZone) {
                        skippedEndpoints.push_back(endpoint);
                        continue;
                }
@@ -412,7 +624,7 @@ void ApiListener::SyncRelayMessage(const MessageOrigin& origin, const DynamicObj
 
                /* only relay the message to a) the same zone, b) the parent zone and c) direct child zones */
                if (target_zone != my_zone && target_zone != my_zone->GetParent() &&
-                   secobj->GetZone() != target_zone->GetName()) {
+                   secobj->GetZoneName() != target_zone->GetName()) {
                        skippedEndpoints.push_back(endpoint);
                        continue;
                }
@@ -423,16 +635,7 @@ void ApiListener::SyncRelayMessage(const MessageOrigin& origin, const DynamicObj
 
                finishedZones.insert(target_zone);
 
-               {
-                       ObjectLock olock(endpoint);
-
-                       if (!endpoint->GetSyncing()) {
-                               Log(LogDebug, "remote", "Sending message to '" + endpoint->GetName() + "'");
-
-                               BOOST_FOREACH(const ApiClient::Ptr& client, endpoint->GetClients())
-                                       client->SendMessage(message);
-                       }
-               }
+               SyncSendMessage(endpoint, message);
        }
 
        BOOST_FOREACH(const Endpoint::Ptr& endpoint, skippedEndpoints)
@@ -452,16 +655,12 @@ void ApiListener::OpenLogFile(void)
        std::fstream *fp = new std::fstream(path.CStr(), std::fstream::out | std::ofstream::app);
 
        if (!fp->good()) {
-               Log(LogWarning, "cluster", "Could not open spool file: " + path);
+               Log(LogWarning, "ApiListener")
+                   << "Could not open spool file: " << path;
                return;
        }
 
-       StdioStream::Ptr logStream = make_shared<StdioStream>(fp, true);
-#ifdef HAVE_BIOZLIB
-       m_LogFile = make_shared<ZlibStream>(logStream);
-#else /* HAVE_BIOZLIB */
-       m_LogFile = logStream;
-#endif /* HAVE_BIOZLIB */
+       m_LogFile = new StdioStream(fp, true);
        m_LogMessageCount = 0;
        SetLogMessageTimestamp(Utility::GetTime());
 }
@@ -493,19 +692,21 @@ void ApiListener::LogGlobHandler(std::vector<int>& files, const String& file)
 {
        String name = Utility::BaseName(file);
 
+       if (name == "current")
+               return;
+
        int ts;
 
        try {
                ts = Convert::ToLong(name);
-       }
-       catch (const std::exception&) {
+       } catch (const std::exception&) {
                return;
        }
 
        files.push_back(ts);
 }
 
-void ApiListener::ReplayLog(const ApiClient::Ptr& client)
+void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)
 {
        Endpoint::Ptr endpoint = client->GetEndpoint();
 
@@ -513,8 +714,17 @@ void ApiListener::ReplayLog(const ApiClient::Ptr& client)
 
        int count = -1;
        double peer_ts = endpoint->GetLocalLogPosition();
+       double logpos_ts = peer_ts;
        bool last_sync = false;
 
+       Endpoint::Ptr target_endpoint = client->GetEndpoint();
+       ASSERT(target_endpoint);
+
+       Zone::Ptr target_zone = target_endpoint->GetZone();
+
+       if (!target_zone)
+               return;
+
        for (;;) {
                boost::mutex::scoped_lock lock(m_LogLock);
 
@@ -540,27 +750,30 @@ void ApiListener::ReplayLog(const ApiClient::Ptr& client)
                        if (ts < peer_ts)
                                continue;
 
-                       Log(LogInformation, "cluster", "Replaying log: " + path);
+                       Log(LogNotice, "ApiListener")
+                           << "Replaying log: " << path;
 
-                       std::fstream *fp = new std::fstream(path.CStr(), std::fstream::in);
-                       StdioStream::Ptr logStream = make_shared<StdioStream>(fp, true);
-#ifdef HAVE_BIOZLIB
-                       ZlibStream::Ptr lstream = make_shared<ZlibStream>(logStream);
-#else /* HAVE_BIOZLIB */
-                       Stream::Ptr lstream = logStream;
-#endif /* HAVE_BIOZLIB */
+                       std::fstream *fp = new std::fstream(path.CStr(), std::fstream::in | std::fstream::binary);
+                       StdioStream::Ptr logStream = new StdioStream(fp, true);
 
                        String message;
+                       StreamReadContext src;
                        while (true) {
                                Dictionary::Ptr pmessage;
 
                                try {
-                                       if (!NetString::ReadStringFromStream(lstream, &message))
+                                       StreamReadStatus srs = NetString::ReadStringFromStream(logStream, &message, src);
+
+                                       if (srs == StatusEof)
                                                break;
 
-                                       pmessage = JsonDeserialize(message);
+                                       if (srs != StatusNewItem)
+                                               continue;
+
+                                       pmessage = JsonDecode(message);
                                } catch (const std::exception&) {
-                                       Log(LogWarning, "cluster", "Unexpected end-of-file for cluster log: " + path);
+                                       Log(LogWarning, "ApiListener")
+                                           << "Unexpected end-of-file for cluster log: " << path;
 
                                        /* Log files may be incomplete or corrupted. This is perfectly OK. */
                                        break;
@@ -569,16 +782,53 @@ void ApiListener::ReplayLog(const ApiClient::Ptr& client)
                                if (pmessage->Get("timestamp") <= peer_ts)
                                        continue;
 
+                               Dictionary::Ptr secname = pmessage->Get("secobj");
+
+                               if (secname) {
+                                       ConfigType::Ptr dtype = ConfigType::GetByName(secname->Get("type"));
+
+                                       if (!dtype)
+                                               continue;
+
+                                       ConfigObject::Ptr secobj = dtype->GetObject(secname->Get("name"));
+
+                                       if (!secobj)
+                                               continue;
+
+                                       if (!target_zone->CanAccessObject(secobj))
+                                               continue;
+                               }
+
                                NetString::WriteStringToStream(client->GetStream(), pmessage->Get("message"));
                                count++;
 
                                peer_ts = pmessage->Get("timestamp");
+
+                               if (ts > logpos_ts + 10) {
+                                       logpos_ts = ts;
+
+                                       Dictionary::Ptr lparams = new Dictionary();
+                                       lparams->Set("log_position", logpos_ts);
+
+                                       Dictionary::Ptr lmessage = new Dictionary();
+                                       lmessage->Set("jsonrpc", "2.0");
+                                       lmessage->Set("method", "log::SetLogPosition");
+                                       lmessage->Set("params", lparams);
+
+                                       JsonRpc::SendMessage(client->GetStream(), lmessage);
+                               }
                        }
 
-                       lstream->Close();
+                       logStream->Close();
+               }
+
+               if (count > 0) {
+                       Log(LogInformation, "ApiListener")
+                          << "Replayed " << count << " messages.";
                }
 
-               Log(LogInformation, "cluster", "Replayed " + Convert::ToString(count) + " messages.");
+               Log(LogNotice, "ApiListener")
+                  << "Replayed " << count << " messages.";
 
                if (last_sync) {
                        {
@@ -593,77 +843,158 @@ void ApiListener::ReplayLog(const ApiClient::Ptr& client)
        }
 }
 
-Value ApiListener::StatsFunc(Dictionary::Ptr& status, Dictionary::Ptr& perfdata)
+void ApiListener::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
 {
-       Dictionary::Ptr nodes = make_shared<Dictionary>();
        std::pair<Dictionary::Ptr, Dictionary::Ptr> stats;
 
        ApiListener::Ptr listener = ApiListener::GetInstance();
 
        if (!listener)
-               return 0;
+               return;
 
        stats = listener->GetStatus();
 
-       BOOST_FOREACH(Dictionary::Pair const& kv, stats.second)
-               perfdata->Set("api_" + kv.first, kv.second);
+       ObjectLock olock(stats.second);
+       BOOST_FOREACH(const Dictionary::Pair& kv, stats.second)
+               perfdata->Add("'api_" + kv.first + "'=" + Convert::ToString(kv.second));
 
        status->Set("api", stats.first);
-
-       return 0;
 }
 
 std::pair<Dictionary::Ptr, Dictionary::Ptr> ApiListener::GetStatus(void)
 {
-       Dictionary::Ptr status = make_shared<Dictionary>();
-       Dictionary::Ptr perfdata = make_shared<Dictionary>();
+       Dictionary::Ptr status = new Dictionary();
+       Dictionary::Ptr perfdata = new Dictionary();
 
        /* cluster stats */
        status->Set("identity", GetIdentity());
 
-       double count_endpoints = 0;
-       Array::Ptr not_connected_endpoints = make_shared<Array>();
-       Array::Ptr connected_endpoints = make_shared<Array>();
+       double allEndpoints = 0;
+       Array::Ptr allNotConnectedEndpoints = new Array();
+       Array::Ptr allConnectedEndpoints = new Array();
+
+       Zone::Ptr my_zone = Zone::GetLocalZone();
 
-       BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
-               if (endpoint->GetName() == GetIdentity())
+       Dictionary::Ptr connectedZones = new Dictionary();
+
+       BOOST_FOREACH(const Zone::Ptr& zone, ConfigType::GetObjectsByType<Zone>()) {
+               /* only check endpoints in a) the same zone b) our parent zone c) immediate child zones */
+               if (my_zone != zone && my_zone != zone->GetParent() && zone != my_zone->GetParent()) {
+                       Log(LogDebug, "ApiListener")
+                           << "Not checking connection to Zone '" << zone->GetName() << "' because it's not in the same zone, a parent or a child zone.";
                        continue;
+               }
 
-               count_endpoints++;
+               bool zoneConnected = false;
+               int countZoneEndpoints = 0;
+               double zoneLag = 0;
 
-               if (!endpoint->IsConnected())
-                       not_connected_endpoints->Add(endpoint->GetName());
-               else
-                       connected_endpoints->Add(endpoint->GetName());
+               Array::Ptr zoneEndpoints = new Array();
+
+               BOOST_FOREACH(const Endpoint::Ptr& endpoint, zone->GetEndpoints()) {
+                       zoneEndpoints->Add(endpoint->GetName());
+
+                       if (endpoint->GetName() == GetIdentity())
+                               continue;
+
+                       double eplag = CalculateZoneLag(endpoint);
+
+                       if (eplag > 0 && eplag > zoneLag)
+                               zoneLag = eplag;
+
+                       allEndpoints++;
+                       countZoneEndpoints++;
+
+                       if (!endpoint->IsConnected()) {
+                               allNotConnectedEndpoints->Add(endpoint->GetName());
+                       } else {
+                               allConnectedEndpoints->Add(endpoint->GetName());
+                               zoneConnected = true;
+                       }
+               }
+
+               /* if there's only one endpoint inside the zone, we're not connected - that's us, fake it */
+               if (zone->GetEndpoints().size() == 1 && countZoneEndpoints == 0)
+                       zoneConnected = true;
+
+               Dictionary::Ptr zoneStats = new Dictionary();
+               zoneStats->Set("connected", zoneConnected);
+               zoneStats->Set("client_log_lag", zoneLag);
+               zoneStats->Set("endpoints", zoneEndpoints);
+
+               String parentZoneName;
+               Zone::Ptr parentZone = zone->GetParent();
+               if (parentZone)
+                       parentZoneName = parentZone->GetName();
+
+               zoneStats->Set("parent_zone", parentZoneName);
+
+               connectedZones->Set(zone->GetName(), zoneStats);
        }
 
-       status->Set("num_endpoints", count_endpoints);
-       status->Set("num_conn_endpoints", connected_endpoints->GetLength());
-       status->Set("num_not_conn_endpoints", not_connected_endpoints->GetLength());
-       status->Set("conn_endpoints", connected_endpoints);
-       status->Set("not_conn_endpoints", not_connected_endpoints);
+       status->Set("num_endpoints", allEndpoints);
+       status->Set("num_conn_endpoints", allConnectedEndpoints->GetLength());
+       status->Set("num_not_conn_endpoints", allNotConnectedEndpoints->GetLength());
+       status->Set("conn_endpoints", allConnectedEndpoints);
+       status->Set("not_conn_endpoints", allNotConnectedEndpoints);
+
+       status->Set("zones", connectedZones);
 
-       perfdata->Set("num_endpoints", count_endpoints);
-       perfdata->Set("num_conn_endpoints", Convert::ToDouble(connected_endpoints->GetLength()));
-       perfdata->Set("num_not_conn_endpoints", Convert::ToDouble(not_connected_endpoints->GetLength()));
+       perfdata->Set("num_endpoints", allEndpoints);
+       perfdata->Set("num_conn_endpoints", Convert::ToDouble(allConnectedEndpoints->GetLength()));
+       perfdata->Set("num_not_conn_endpoints", Convert::ToDouble(allNotConnectedEndpoints->GetLength()));
 
        return std::make_pair(status, perfdata);
 }
 
-void ApiListener::AddAnonymousClient(const ApiClient::Ptr& aclient)
+double ApiListener::CalculateZoneLag(const Endpoint::Ptr& endpoint)
+{
+       double remoteLogPosition = endpoint->GetRemoteLogPosition();
+       double eplag = Utility::GetTime() - remoteLogPosition;
+
+       if ((endpoint->GetSyncing() || !endpoint->IsConnected()) && remoteLogPosition != 0)
+               return eplag;
+
+       return 0;
+}
+
+void ApiListener::AddAnonymousClient(const JsonRpcConnection::Ptr& aclient)
 {
        ObjectLock olock(this);
        m_AnonymousClients.insert(aclient);
 }
 
-void ApiListener::RemoveAnonymousClient(const ApiClient::Ptr& aclient)
+void ApiListener::RemoveAnonymousClient(const JsonRpcConnection::Ptr& aclient)
 {
        ObjectLock olock(this);
        m_AnonymousClients.erase(aclient);
 }
 
-std::set<ApiClient::Ptr> ApiListener::GetAnonymousClients(void) const
+std::set<JsonRpcConnection::Ptr> ApiListener::GetAnonymousClients(void) const
 {
        ObjectLock olock(this);
        return m_AnonymousClients;
-}
\ No newline at end of file
+}
+
+void ApiListener::AddHttpClient(const HttpServerConnection::Ptr& aclient)
+{
+       ObjectLock olock(this);
+       m_HttpClients.insert(aclient);
+}
+
+void ApiListener::RemoveHttpClient(const HttpServerConnection::Ptr& aclient)
+{
+       ObjectLock olock(this);
+       m_HttpClients.erase(aclient);
+}
+
+std::set<HttpServerConnection::Ptr> ApiListener::GetHttpClients(void) const
+{
+       ObjectLock olock(this);
+       return m_HttpClients;
+}
+
+Value ApiListener::HelloAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params)
+{
+       return Empty;
+}