/******************************************************************************
* Icinga 2 *
- * Copyright (C) 2012-2014 Icinga Development Team (http://www.icinga.org) *
+ * Copyright (C) 2012-2015 Icinga Development Team (http://www.icinga.org) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
-#include "remote/apilistener.h"
-#include "remote/apiclient.h"
-#include "remote/endpoint.h"
-#include "remote/jsonrpc.h"
-#include "base/convert.h"
-#include "base/netstring.h"
-#include "base/dynamictype.h"
-#include "base/logger_fwd.h"
-#include "base/objectlock.h"
-#include "base/stdiostream.h"
-#include "base/networkstream.h"
-#include "base/zlibstream.h"
-#include "base/application.h"
-#include "base/context.h"
-#include "base/statsfunction.h"
+#include "remote/apilistener.hpp"
+#include "remote/apilistener.tcpp"
+#include "remote/jsonrpcconnection.hpp"
+#include "remote/endpoint.hpp"
+#include "remote/jsonrpc.hpp"
+#include "remote/apifunction.hpp"
+#include "base/convert.hpp"
+#include "base/netstring.hpp"
+#include "base/json.hpp"
+#include "base/configtype.hpp"
+#include "base/logger.hpp"
+#include "base/objectlock.hpp"
+#include "base/stdiostream.hpp"
+#include "base/application.hpp"
+#include "base/context.hpp"
+#include "base/statsfunction.hpp"
+#include "base/exception.hpp"
#include <fstream>
using namespace icinga;
boost::signals2::signal<void(bool)> ApiListener::OnMasterChanged;
-REGISTER_STATSFUNCTION(ApiListenerStats, &ApiListener::StatsFunc);
+REGISTER_STATSFUNCTION(ApiListener, &ApiListener::StatsFunc);
+
+REGISTER_APIFUNCTION(Hello, icinga, &ApiListener::HelloAPIHandler);
+
+ApiListener::ApiListener(void)
+ : m_LogMessageCount(0)
+{ }
void ApiListener::OnConfigLoaded(void)
{
/* set up SSL context */
- shared_ptr<X509> cert = GetX509Certificate(GetCertPath());
- SetIdentity(GetCertificateCN(cert));
- Log(LogInformation, "remote", "My API identity: " + GetIdentity());
+ boost::shared_ptr<X509> cert;
+ try {
+ cert = GetX509Certificate(GetCertPath());
+ } catch (const std::exception&) {
+ BOOST_THROW_EXCEPTION(ScriptError("Cannot get certificate from cert path: '"
+ + GetCertPath() + "'.", GetDebugInfo()));
+ }
- m_SSLContext = MakeSSLContext(GetCertPath(), GetKeyPath(), GetCaPath());
+ try {
+ SetIdentity(GetCertificateCN(cert));
+ } catch (const std::exception&) {
+ BOOST_THROW_EXCEPTION(ScriptError("Cannot get certificate common name from cert path: '"
+ + GetCertPath() + "'.", GetDebugInfo()));
+ }
- if (!GetCrlPath().IsEmpty())
- AddCRLToSSLContext(m_SSLContext, GetCrlPath());
+ Log(LogInformation, "ApiListener")
+ << "My API identity: " << GetIdentity();
+ try {
+ m_SSLContext = MakeSSLContext(GetCertPath(), GetKeyPath(), GetCaPath());
+ } catch (const std::exception&) {
+ BOOST_THROW_EXCEPTION(ScriptError("Cannot make SSL context for cert path: '"
+ + GetCertPath() + "' key path: '" + GetKeyPath() + "' ca path: '" + GetCaPath() + "'.", GetDebugInfo()));
+ }
+
+ if (!GetCrlPath().IsEmpty()) {
+ try {
+ AddCRLToSSLContext(m_SSLContext, GetCrlPath());
+ } catch (const std::exception&) {
+ BOOST_THROW_EXCEPTION(ScriptError("Cannot add certificate revocation list to SSL context for crl path: '"
+ + GetCrlPath() + "'.", GetDebugInfo()));
+ }
+ }
+}
+
+void ApiListener::OnAllConfigLoaded(void)
+{
if (!Endpoint::GetByName(GetIdentity()))
- BOOST_THROW_EXCEPTION(std::runtime_error("Endpoint object for '" + GetIdentity() + "' is missing."));
+ BOOST_THROW_EXCEPTION(ScriptError("Endpoint object for '" + GetIdentity() + "' is missing.", GetDebugInfo()));
}
/**
*/
void ApiListener::Start(void)
{
- if (std::distance(DynamicType::GetObjects<ApiListener>().first, DynamicType::GetObjects<ApiListener>().second) > 1)
- BOOST_THROW_EXCEPTION(std::runtime_error("Only one ApiListener object is allowed."));
+ SyncZoneDirs();
+
+ if (std::distance(ConfigType::GetObjectsByType<ApiListener>().first,
+ ConfigType::GetObjectsByType<ApiListener>().second) > 1) {
+ Log(LogCritical, "ApiListener", "Only one ApiListener object is allowed.");
+ return;
+ }
- DynamicObject::Start();
+ ObjectImpl<ApiListener>::Start();
{
boost::mutex::scoped_lock(m_LogLock);
}
/* create the primary JSON-RPC listener */
- AddListener(GetBindPort());
+ if (!AddListener(GetBindHost(), GetBindPort())) {
+ Log(LogCritical, "ApiListener")
+ << "Cannot add listener on host '" << GetBindHost() << "' for port '" << GetBindPort() << "'.";
+ Application::Exit(EXIT_FAILURE);
+ }
- m_Timer = make_shared<Timer>();
+ m_Timer = new Timer();
m_Timer->OnTimerExpired.connect(boost::bind(&ApiListener::ApiTimerHandler, this));
m_Timer->SetInterval(5);
m_Timer->Start();
ApiListener::Ptr ApiListener::GetInstance(void)
{
- BOOST_FOREACH(const ApiListener::Ptr& listener, DynamicType::GetObjects<ApiListener>())
+ BOOST_FOREACH(const ApiListener::Ptr& listener, ConfigType::GetObjectsByType<ApiListener>())
return listener;
return ApiListener::Ptr();
}
-shared_ptr<SSL_CTX> ApiListener::GetSSLContext(void) const
+boost::shared_ptr<SSL_CTX> ApiListener::GetSSLContext(void) const
{
return m_SSLContext;
}
Endpoint::Ptr ApiListener::GetMaster(void) const
{
Zone::Ptr zone = Zone::GetLocalZone();
+
+ if (!zone)
+ return Endpoint::Ptr();
+
std::vector<String> names;
BOOST_FOREACH(const Endpoint::Ptr& endpoint, zone->GetEndpoints())
bool ApiListener::IsMaster(void) const
{
- return GetMaster()->GetName() == GetIdentity();
+ Endpoint::Ptr master = GetMaster();
+
+ if (!master)
+ return false;
+
+ return master->GetName() == GetIdentity();
}
/**
* Creates a new JSON-RPC listener on the specified port.
*
+ * @param node The host the listener should be bound to.
* @param service The port to listen on.
*/
-void ApiListener::AddListener(const String& service)
+bool ApiListener::AddListener(const String& node, const String& service)
{
ObjectLock olock(this);
- shared_ptr<SSL_CTX> sslContext = m_SSLContext;
+ boost::shared_ptr<SSL_CTX> sslContext = m_SSLContext;
- if (!sslContext)
- BOOST_THROW_EXCEPTION(std::logic_error("SSL context is required for AddListener()"));
+ if (!sslContext) {
+ Log(LogCritical, "ApiListener", "SSL context is required for AddListener()");
+ return false;
+ }
- std::ostringstream s;
- s << "Adding new listener: port " << service;
- Log(LogInformation, "agent", s.str());
+ Log(LogInformation, "ApiListener")
+ << "Adding new listener on port '" << service << "'";
- TcpSocket::Ptr server = make_shared<TcpSocket>();
- server->Bind(service, AF_INET6);
+ TcpSocket::Ptr server = new TcpSocket();
+
+ try {
+ server->Bind(node, service, AF_UNSPEC);
+ } catch (const std::exception&) {
+ Log(LogCritical, "ApiListener")
+ << "Cannot bind TCP socket for host '" << node << "' on port '" << service << "'.";
+ return false;
+ }
boost::thread thread(boost::bind(&ApiListener::ListenerThreadProc, this, server));
thread.detach();
m_Servers.insert(server);
+
+ return true;
}
void ApiListener::ListenerThreadProc(const Socket::Ptr& server)
server->Listen();
for (;;) {
- Socket::Ptr client = server->Accept();
-
- Utility::QueueAsyncCallback(boost::bind(&ApiListener::NewClientHandler, this, client, RoleServer));
+ try {
+ Socket::Ptr client = server->Accept();
+ boost::thread thread(boost::bind(&ApiListener::NewClientHandler, this, client, String(), RoleServer));
+ thread.detach();
+ } catch (const std::exception&) {
+ Log(LogCritical, "ApiListener", "Cannot accept new connection.");
+ }
}
}
/**
- * Creates a new JSON-RPC client and connects to the specified host and port.
+ * Creates a new JSON-RPC client and connects to the specified endpoint.
*
- * @param node The remote host.
- * @param service The remote port.
+ * @param endpoint The endpoint.
*/
-void ApiListener::AddConnection(const String& node, const String& service) {
+void ApiListener::AddConnection(const Endpoint::Ptr& endpoint)
+{
{
ObjectLock olock(this);
- shared_ptr<SSL_CTX> sslContext = m_SSLContext;
+ boost::shared_ptr<SSL_CTX> sslContext = m_SSLContext;
- if (!sslContext)
- BOOST_THROW_EXCEPTION(std::logic_error("SSL context is required for AddConnection()"));
+ if (!sslContext) {
+ Log(LogCritical, "ApiListener", "SSL context is required for AddConnection()");
+ return;
+ }
}
- TcpSocket::Ptr client = make_shared<TcpSocket>();
+ String host = endpoint->GetHost();
+ String port = endpoint->GetPort();
+
+ Log(LogInformation, "JsonRpcConnection")
+ << "Reconnecting to API endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "'";
+
+ TcpSocket::Ptr client = new TcpSocket();
+
+ try {
+ endpoint->SetConnecting(true);
+ client->Connect(host, port);
+ NewClientHandler(client, endpoint->GetName(), RoleClient);
+ endpoint->SetConnecting(false);
+ } catch (const std::exception& ex) {
+ endpoint->SetConnecting(false);
+ client->Close();
+
+ std::ostringstream info;
+ info << "Cannot connect to host '" << host << "' on port '" << port << "'";
+ Log(LogCritical, "ApiListener", info.str());
+ Log(LogDebug, "ApiListener")
+ << info.str() << "\n" << DiagnosticInformation(ex);
+ }
+}
- client->Connect(node, service);
- Utility::QueueAsyncCallback(boost::bind(&ApiListener::NewClientHandler, this, client, RoleClient));
+void ApiListener::NewClientHandler(const Socket::Ptr& client, const String& hostname, ConnectionRole role)
+{
+ try {
+ NewClientHandlerInternal(client, hostname, role);
+ } catch (const std::exception& ex) {
+ Log(LogCritical, "ApiListener")
+ << "Exception while handling new API client connection: " << DiagnosticInformation(ex);
+ }
}
/**
*
* @param client The new client.
*/
-void ApiListener::NewClientHandler(const Socket::Ptr& client, ConnectionRole role)
+void ApiListener::NewClientHandlerInternal(const Socket::Ptr& client, const String& hostname, ConnectionRole role)
{
CONTEXT("Handling new API client connection");
{
ObjectLock olock(this);
- tlsStream = make_shared<TlsStream>(client, role, m_SSLContext);
+ try {
+ tlsStream = new TlsStream(client, hostname, role, m_SSLContext);
+ } catch (const std::exception&) {
+ Log(LogCritical, "ApiListener", "Cannot create TLS stream from client connection.");
+ return;
+ }
+ }
+
+ try {
+ tlsStream->Handshake();
+ } catch (const std::exception& ex) {
+ Log(LogCritical, "ApiListener", "Client TLS handshake failed");
+ return;
}
- tlsStream->Handshake();
+ boost::shared_ptr<X509> cert = tlsStream->GetPeerCertificate();
+ String identity;
+ Endpoint::Ptr endpoint;
+ bool verify_ok = false;
+
+ if (cert) {
+ try {
+ identity = GetCertificateCN(cert);
+ } catch (const std::exception&) {
+ Log(LogCritical, "ApiListener")
+ << "Cannot get certificate common name from cert path: '" << GetCertPath() << "'.";
+ return;
+ }
+
+ verify_ok = tlsStream->IsVerifyOK();
- shared_ptr<X509> cert = tlsStream->GetPeerCertificate();
- String identity = GetCertificateCN(cert);
+ Log(LogInformation, "ApiListener")
+ << "New client connection for identity '" << identity << "'" << (verify_ok ? "" : " (unauthenticated)");
- Log(LogInformation, "remote", "New client connection for identity '" + identity + "'");
- Endpoint::Ptr endpoint = Endpoint::GetByName(identity);
+ if (verify_ok)
+ endpoint = Endpoint::GetByName(identity);
+ } else {
+ Log(LogInformation, "ApiListener")
+ << "New client connection (no client certificate)";
+ }
- bool need_sync;
+ bool need_sync = false;
if (endpoint)
need_sync = !endpoint->IsConnected();
- ApiClient::Ptr aclient = make_shared<ApiClient>(identity, tlsStream, role);
- aclient->Start();
+ ClientType ctype;
+
+ if (role == RoleClient) {
+ Dictionary::Ptr message = new Dictionary();
+ message->Set("jsonrpc", "2.0");
+ message->Set("method", "icinga::Hello");
+ message->Set("params", new Dictionary());
+ JsonRpc::SendMessage(tlsStream, message);
+ ctype = ClientJsonRpc;
+ } else {
+ tlsStream->WaitForData(5);
+
+ if (!tlsStream->IsDataAvailable()) {
+ Log(LogWarning, "ApiListener", "No data received on new API connection.");
+ return;
+ }
- if (endpoint) {
- if (need_sync) {
- {
- ObjectLock olock(endpoint);
+ char firstByte;
+ tlsStream->Peek(&firstByte, 1, false);
- endpoint->SetSyncing(true);
- }
+ if (firstByte >= '0' && firstByte <= '9')
+ ctype = ClientJsonRpc;
+ else
+ ctype = ClientHttp;
+ }
- ReplayLog(aclient);
- }
+ if (ctype == ClientJsonRpc) {
+ Log(LogInformation, "ApiListener", "New JSON-RPC client");
+
+ JsonRpcConnection::Ptr aclient = new JsonRpcConnection(identity, verify_ok, tlsStream, role);
+ aclient->Start();
+
+ if (endpoint) {
+ endpoint->AddClient(aclient);
+
+ /* sync zone file config */
+ SendConfigUpdate(aclient);
+ /* sync runtime config */
+ SendRuntimeConfigObjects(aclient);
+
+ if (need_sync) {
+ {
+ ObjectLock olock(endpoint);
+
+ endpoint->SetSyncing(true);
+ }
- endpoint->AddClient(aclient);
- } else
- AddAnonymousClient(aclient);
+ ReplayLog(aclient);
+ }
+ } else
+ AddAnonymousClient(aclient);
+ } else {
+ Log(LogInformation, "ApiListener", "New HTTP client");
+
+ HttpServerConnection::Ptr aclient = new HttpServerConnection(identity, verify_ok, tlsStream);
+ aclient->Start();
+ AddHttpClient(aclient);
+ }
}
void ApiListener::ApiTimerHandler(void)
BOOST_FOREACH(int ts, files) {
bool need = false;
- BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
+ BOOST_FOREACH(const Endpoint::Ptr& endpoint, ConfigType::GetObjectsByType<Endpoint>()) {
if (endpoint->GetName() == GetIdentity())
continue;
if (!need) {
String path = GetApiDir() + "log/" + Convert::ToString(ts);
- Log(LogInformation, "remote", "Removing old log file: " + path);
+ Log(LogNotice, "ApiListener")
+ << "Removing old log file: " << path;
(void)unlink(path.CStr());
}
}
- if (IsMaster()) {
- Zone::Ptr my_zone = Zone::GetLocalZone();
-
- BOOST_FOREACH(const Zone::Ptr& zone, DynamicType::GetObjects<Zone>()) {
- /* only connect to endpoints in a) the same zone b) our parent zone c) immediate child zones */
- if (my_zone != zone && my_zone != zone->GetParent() && zone != my_zone->GetParent())
- continue;
+ Zone::Ptr my_zone = Zone::GetLocalZone();
- bool connected = false;
+ BOOST_FOREACH(const Zone::Ptr& zone, ConfigType::GetObjectsByType<Zone>()) {
+ /* only connect to endpoints in a) the same zone b) our parent zone c) immediate child zones */
+ if (my_zone != zone && my_zone != zone->GetParent() && zone != my_zone->GetParent()) {
+ Log(LogDebug, "ApiListener")
+ << "Not connecting to Zone '" << zone->GetName()
+ << "' because it's not in the same zone, a parent or a child zone.";
+ continue;
+ }
- BOOST_FOREACH(const Endpoint::Ptr& endpoint, zone->GetEndpoints()) {
- if (endpoint->IsConnected()) {
- connected = true;
- break;
- }
+ BOOST_FOREACH(const Endpoint::Ptr& endpoint, zone->GetEndpoints()) {
+ /* don't connect to ourselves */
+ if (endpoint->GetName() == GetIdentity()) {
+ Log(LogDebug, "ApiListener")
+ << "Not connecting to Endpoint '" << endpoint->GetName() << "' because that's us.";
+ continue;
}
- /* don't connect to an endpoint if we already have a connection to the zone */
- if (connected)
+ /* don't try to connect to endpoints which don't have a host and port */
+ if (endpoint->GetHost().IsEmpty() || endpoint->GetPort().IsEmpty()) {
+ Log(LogDebug, "ApiListener")
+ << "Not connecting to Endpoint '" << endpoint->GetName()
+ << "' because the host/port attributes are missing.";
continue;
+ }
- BOOST_FOREACH(const Endpoint::Ptr& endpoint, zone->GetEndpoints()) {
- /* don't connect to ourselves */
- if (endpoint->GetName() == GetIdentity())
- continue;
-
- /* don't try to connect to endpoints which don't have a host and port */
- if (endpoint->GetHost().IsEmpty() || endpoint->GetPort().IsEmpty())
- continue;
+ /* don't try to connect if there's already a connection attempt */
+ if (endpoint->GetConnecting()) {
+ Log(LogDebug, "ApiListener")
+ << "Not connecting to Endpoint '" << endpoint->GetName()
+ << "' because we're already trying to connect to it.";
+ continue;
+ }
- AddConnection(endpoint->GetHost(), endpoint->GetPort());
+ /* don't try to connect if we're already connected */
+ if (endpoint->IsConnected()) {
+ Log(LogDebug, "ApiListener")
+ << "Not connecting to Endpoint '" << endpoint->GetName()
+ << "' because we're already connected to it.";
+ continue;
}
+
+ boost::thread thread(boost::bind(&ApiListener::AddConnection, this, endpoint));
+ thread.detach();
}
}
- BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
+ BOOST_FOREACH(const Endpoint::Ptr& endpoint, ConfigType::GetObjectsByType<Endpoint>()) {
if (!endpoint->IsConnected())
continue;
if (ts == 0)
continue;
- Dictionary::Ptr lparams = make_shared<Dictionary>();
+ Dictionary::Ptr lparams = new Dictionary();
lparams->Set("log_position", ts);
- Dictionary::Ptr lmessage = make_shared<Dictionary>();
+ Dictionary::Ptr lmessage = new Dictionary();
lmessage->Set("jsonrpc", "2.0");
lmessage->Set("method", "log::SetLogPosition");
lmessage->Set("params", lparams);
- BOOST_FOREACH(const ApiClient::Ptr& client, endpoint->GetClients())
+ BOOST_FOREACH(const JsonRpcConnection::Ptr& client, endpoint->GetClients())
client->SendMessage(lmessage);
- Log(LogInformation, "remote", "Setting log position for identity '" + endpoint->GetName() + "': " +
- Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", ts));
+ Log(LogNotice, "ApiListener")
+ << "Setting log position for identity '" << endpoint->GetName() << "': "
+ << Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", ts);
}
- Log(LogInformation, "remote", "Current master: " + GetMaster()->GetName());
+ Endpoint::Ptr master = GetMaster();
+
+ if (master)
+ Log(LogNotice, "ApiListener")
+ << "Current zone master: " << master->GetName();
std::vector<String> names;
- BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>())
+ BOOST_FOREACH(const Endpoint::Ptr& endpoint, ConfigType::GetObjectsByType<Endpoint>())
if (endpoint->IsConnected())
names.push_back(endpoint->GetName() + " (" + Convert::ToString(endpoint->GetClients().size()) + ")");
- Log(LogInformation, "remote", "Connected endpoints: " + Utility::NaturalJoin(names));
+ Log(LogNotice, "ApiListener")
+ << "Connected endpoints: " << Utility::NaturalJoin(names);
}
-void ApiListener::RelayMessage(const MessageOrigin& origin, const DynamicObject::Ptr& secobj, const Dictionary::Ptr& message, bool log)
+void ApiListener::RelayMessage(const MessageOrigin::Ptr& origin,
+ const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log)
{
- m_RelayQueue.Enqueue(boost::bind(&ApiListener::SyncRelayMessage, this, origin, secobj, message, log));
+ m_RelayQueue.Enqueue(boost::bind(&ApiListener::SyncRelayMessage, this, origin, secobj, message, log), true);
}
-void ApiListener::PersistMessage(const Dictionary::Ptr& message)
+void ApiListener::PersistMessage(const Dictionary::Ptr& message, const ConfigObject::Ptr& secobj)
{
double ts = message->Get("ts");
ASSERT(ts != 0);
- Dictionary::Ptr pmessage = make_shared<Dictionary>();
+ Dictionary::Ptr pmessage = new Dictionary();
pmessage->Set("timestamp", ts);
- pmessage->Set("message", JsonSerialize(message));
+ pmessage->Set("message", JsonEncode(message));
+ Dictionary::Ptr secname = new Dictionary();
+ secname->Set("type", secobj->GetType()->GetName());
+ secname->Set("name", secobj->GetName());
+ pmessage->Set("secobj", secname);
boost::mutex::scoped_lock lock(m_LogLock);
if (m_LogFile) {
- NetString::WriteStringToStream(m_LogFile, JsonSerialize(pmessage));
+ NetString::WriteStringToStream(m_LogFile, JsonEncode(pmessage));
m_LogMessageCount++;
SetLogMessageTimestamp(ts);
}
}
-void ApiListener::SyncRelayMessage(const MessageOrigin& origin, const DynamicObject::Ptr& secobj, const Dictionary::Ptr& message, bool log)
+void ApiListener::SyncSendMessage(const Endpoint::Ptr& endpoint, const Dictionary::Ptr& message)
+{
+ ObjectLock olock(endpoint);
+
+ if (!endpoint->GetSyncing()) {
+ Log(LogNotice, "ApiListener")
+ << "Sending message to '" << endpoint->GetName() << "'";
+
+ BOOST_FOREACH(const JsonRpcConnection::Ptr& client, endpoint->GetClients())
+ client->SendMessage(message);
+ }
+}
+
+
+void ApiListener::SyncRelayMessage(const MessageOrigin::Ptr& origin,
+ const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log)
{
double ts = Utility::GetTime();
message->Set("ts", ts);
- Log(LogDebug, "remote", "Relaying '" + message->Get("method") + "' message");
+ Log(LogNotice, "ApiListener")
+ << "Relaying '" << message->Get("method") << "' message";
if (log)
- m_LogQueue.Enqueue(boost::bind(&ApiListener::PersistMessage, this, message));
+ PersistMessage(message, secobj);
- if (origin.FromZone)
- message->Set("originZone", origin.FromZone->GetName());
+ if (origin && origin->FromZone)
+ message->Set("originZone", origin->FromZone->GetName());
bool is_master = IsMaster();
Endpoint::Ptr master = GetMaster();
std::vector<Endpoint::Ptr> skippedEndpoints;
std::set<Zone::Ptr> finishedZones;
- BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
+ BOOST_FOREACH(const Endpoint::Ptr& endpoint, ConfigType::GetObjectsByType<Endpoint>()) {
/* don't relay messages to ourselves or disconnected endpoints */
if (endpoint->GetName() == GetIdentity() || !endpoint->IsConnected())
continue;
}
/* don't relay messages back to the endpoint which we got the message from */
- if (origin.FromClient && endpoint == origin.FromClient->GetEndpoint()) {
+ if (origin && origin->FromClient && endpoint == origin->FromClient->GetEndpoint()) {
skippedEndpoints.push_back(endpoint);
continue;
}
/* don't relay messages back to the zone which we got the message from */
- if (origin.FromZone && target_zone == origin.FromZone) {
+ if (origin && origin->FromZone && target_zone == origin->FromZone) {
skippedEndpoints.push_back(endpoint);
continue;
}
/* only relay the message to a) the same zone, b) the parent zone and c) direct child zones */
if (target_zone != my_zone && target_zone != my_zone->GetParent() &&
- secobj->GetZone() != target_zone->GetName()) {
+ secobj->GetZoneName() != target_zone->GetName()) {
skippedEndpoints.push_back(endpoint);
continue;
}
finishedZones.insert(target_zone);
- {
- ObjectLock olock(endpoint);
-
- if (!endpoint->GetSyncing()) {
- Log(LogDebug, "remote", "Sending message to '" + endpoint->GetName() + "'");
-
- BOOST_FOREACH(const ApiClient::Ptr& client, endpoint->GetClients())
- client->SendMessage(message);
- }
- }
+ SyncSendMessage(endpoint, message);
}
BOOST_FOREACH(const Endpoint::Ptr& endpoint, skippedEndpoints)
std::fstream *fp = new std::fstream(path.CStr(), std::fstream::out | std::ofstream::app);
if (!fp->good()) {
- Log(LogWarning, "cluster", "Could not open spool file: " + path);
+ Log(LogWarning, "ApiListener")
+ << "Could not open spool file: " << path;
return;
}
- StdioStream::Ptr logStream = make_shared<StdioStream>(fp, true);
-#ifdef HAVE_BIOZLIB
- m_LogFile = make_shared<ZlibStream>(logStream);
-#else /* HAVE_BIOZLIB */
- m_LogFile = logStream;
-#endif /* HAVE_BIOZLIB */
+ m_LogFile = new StdioStream(fp, true);
m_LogMessageCount = 0;
SetLogMessageTimestamp(Utility::GetTime());
}
{
String name = Utility::BaseName(file);
+ if (name == "current")
+ return;
+
int ts;
try {
ts = Convert::ToLong(name);
- }
- catch (const std::exception&) {
+ } catch (const std::exception&) {
return;
}
files.push_back(ts);
}
-void ApiListener::ReplayLog(const ApiClient::Ptr& client)
+void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)
{
Endpoint::Ptr endpoint = client->GetEndpoint();
int count = -1;
double peer_ts = endpoint->GetLocalLogPosition();
+ double logpos_ts = peer_ts;
bool last_sync = false;
+ Endpoint::Ptr target_endpoint = client->GetEndpoint();
+ ASSERT(target_endpoint);
+
+ Zone::Ptr target_zone = target_endpoint->GetZone();
+
+ if (!target_zone)
+ return;
+
for (;;) {
boost::mutex::scoped_lock lock(m_LogLock);
if (ts < peer_ts)
continue;
- Log(LogInformation, "cluster", "Replaying log: " + path);
+ Log(LogNotice, "ApiListener")
+ << "Replaying log: " << path;
- std::fstream *fp = new std::fstream(path.CStr(), std::fstream::in);
- StdioStream::Ptr logStream = make_shared<StdioStream>(fp, true);
-#ifdef HAVE_BIOZLIB
- ZlibStream::Ptr lstream = make_shared<ZlibStream>(logStream);
-#else /* HAVE_BIOZLIB */
- Stream::Ptr lstream = logStream;
-#endif /* HAVE_BIOZLIB */
+ std::fstream *fp = new std::fstream(path.CStr(), std::fstream::in | std::fstream::binary);
+ StdioStream::Ptr logStream = new StdioStream(fp, true);
String message;
+ StreamReadContext src;
while (true) {
Dictionary::Ptr pmessage;
try {
- if (!NetString::ReadStringFromStream(lstream, &message))
+ StreamReadStatus srs = NetString::ReadStringFromStream(logStream, &message, src);
+
+ if (srs == StatusEof)
break;
- pmessage = JsonDeserialize(message);
+ if (srs != StatusNewItem)
+ continue;
+
+ pmessage = JsonDecode(message);
} catch (const std::exception&) {
- Log(LogWarning, "cluster", "Unexpected end-of-file for cluster log: " + path);
+ Log(LogWarning, "ApiListener")
+ << "Unexpected end-of-file for cluster log: " << path;
/* Log files may be incomplete or corrupted. This is perfectly OK. */
break;
if (pmessage->Get("timestamp") <= peer_ts)
continue;
+ Dictionary::Ptr secname = pmessage->Get("secobj");
+
+ if (secname) {
+ ConfigType::Ptr dtype = ConfigType::GetByName(secname->Get("type"));
+
+ if (!dtype)
+ continue;
+
+ ConfigObject::Ptr secobj = dtype->GetObject(secname->Get("name"));
+
+ if (!secobj)
+ continue;
+
+ if (!target_zone->CanAccessObject(secobj))
+ continue;
+ }
+
NetString::WriteStringToStream(client->GetStream(), pmessage->Get("message"));
count++;
peer_ts = pmessage->Get("timestamp");
+
+ if (ts > logpos_ts + 10) {
+ logpos_ts = ts;
+
+ Dictionary::Ptr lparams = new Dictionary();
+ lparams->Set("log_position", logpos_ts);
+
+ Dictionary::Ptr lmessage = new Dictionary();
+ lmessage->Set("jsonrpc", "2.0");
+ lmessage->Set("method", "log::SetLogPosition");
+ lmessage->Set("params", lparams);
+
+ JsonRpc::SendMessage(client->GetStream(), lmessage);
+ }
}
- lstream->Close();
+ logStream->Close();
+ }
+
+ if (count > 0) {
+ Log(LogInformation, "ApiListener")
+ << "Replayed " << count << " messages.";
}
- Log(LogInformation, "cluster", "Replayed " + Convert::ToString(count) + " messages.");
+ Log(LogNotice, "ApiListener")
+ << "Replayed " << count << " messages.";
if (last_sync) {
{
}
}
-Value ApiListener::StatsFunc(Dictionary::Ptr& status, Dictionary::Ptr& perfdata)
+void ApiListener::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
{
- Dictionary::Ptr nodes = make_shared<Dictionary>();
std::pair<Dictionary::Ptr, Dictionary::Ptr> stats;
ApiListener::Ptr listener = ApiListener::GetInstance();
if (!listener)
- return 0;
+ return;
stats = listener->GetStatus();
- BOOST_FOREACH(Dictionary::Pair const& kv, stats.second)
- perfdata->Set("api_" + kv.first, kv.second);
+ ObjectLock olock(stats.second);
+ BOOST_FOREACH(const Dictionary::Pair& kv, stats.second)
+ perfdata->Add("'api_" + kv.first + "'=" + Convert::ToString(kv.second));
status->Set("api", stats.first);
-
- return 0;
}
std::pair<Dictionary::Ptr, Dictionary::Ptr> ApiListener::GetStatus(void)
{
- Dictionary::Ptr status = make_shared<Dictionary>();
- Dictionary::Ptr perfdata = make_shared<Dictionary>();
+ Dictionary::Ptr status = new Dictionary();
+ Dictionary::Ptr perfdata = new Dictionary();
/* cluster stats */
status->Set("identity", GetIdentity());
- double count_endpoints = 0;
- Array::Ptr not_connected_endpoints = make_shared<Array>();
- Array::Ptr connected_endpoints = make_shared<Array>();
+ double allEndpoints = 0;
+ Array::Ptr allNotConnectedEndpoints = new Array();
+ Array::Ptr allConnectedEndpoints = new Array();
+
+ Zone::Ptr my_zone = Zone::GetLocalZone();
- BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
- if (endpoint->GetName() == GetIdentity())
+ Dictionary::Ptr connectedZones = new Dictionary();
+
+ BOOST_FOREACH(const Zone::Ptr& zone, ConfigType::GetObjectsByType<Zone>()) {
+ /* only check endpoints in a) the same zone b) our parent zone c) immediate child zones */
+ if (my_zone != zone && my_zone != zone->GetParent() && zone != my_zone->GetParent()) {
+ Log(LogDebug, "ApiListener")
+ << "Not checking connection to Zone '" << zone->GetName() << "' because it's not in the same zone, a parent or a child zone.";
continue;
+ }
- count_endpoints++;
+ bool zoneConnected = false;
+ int countZoneEndpoints = 0;
+ double zoneLag = 0;
- if (!endpoint->IsConnected())
- not_connected_endpoints->Add(endpoint->GetName());
- else
- connected_endpoints->Add(endpoint->GetName());
+ Array::Ptr zoneEndpoints = new Array();
+
+ BOOST_FOREACH(const Endpoint::Ptr& endpoint, zone->GetEndpoints()) {
+ zoneEndpoints->Add(endpoint->GetName());
+
+ if (endpoint->GetName() == GetIdentity())
+ continue;
+
+ double eplag = CalculateZoneLag(endpoint);
+
+ if (eplag > 0 && eplag > zoneLag)
+ zoneLag = eplag;
+
+ allEndpoints++;
+ countZoneEndpoints++;
+
+ if (!endpoint->IsConnected()) {
+ allNotConnectedEndpoints->Add(endpoint->GetName());
+ } else {
+ allConnectedEndpoints->Add(endpoint->GetName());
+ zoneConnected = true;
+ }
+ }
+
+ /* if there's only one endpoint inside the zone, we're not connected - that's us, fake it */
+ if (zone->GetEndpoints().size() == 1 && countZoneEndpoints == 0)
+ zoneConnected = true;
+
+ Dictionary::Ptr zoneStats = new Dictionary();
+ zoneStats->Set("connected", zoneConnected);
+ zoneStats->Set("client_log_lag", zoneLag);
+ zoneStats->Set("endpoints", zoneEndpoints);
+
+ String parentZoneName;
+ Zone::Ptr parentZone = zone->GetParent();
+ if (parentZone)
+ parentZoneName = parentZone->GetName();
+
+ zoneStats->Set("parent_zone", parentZoneName);
+
+ connectedZones->Set(zone->GetName(), zoneStats);
}
- status->Set("num_endpoints", count_endpoints);
- status->Set("num_conn_endpoints", connected_endpoints->GetLength());
- status->Set("num_not_conn_endpoints", not_connected_endpoints->GetLength());
- status->Set("conn_endpoints", connected_endpoints);
- status->Set("not_conn_endpoints", not_connected_endpoints);
+ status->Set("num_endpoints", allEndpoints);
+ status->Set("num_conn_endpoints", allConnectedEndpoints->GetLength());
+ status->Set("num_not_conn_endpoints", allNotConnectedEndpoints->GetLength());
+ status->Set("conn_endpoints", allConnectedEndpoints);
+ status->Set("not_conn_endpoints", allNotConnectedEndpoints);
+
+ status->Set("zones", connectedZones);
- perfdata->Set("num_endpoints", count_endpoints);
- perfdata->Set("num_conn_endpoints", Convert::ToDouble(connected_endpoints->GetLength()));
- perfdata->Set("num_not_conn_endpoints", Convert::ToDouble(not_connected_endpoints->GetLength()));
+ perfdata->Set("num_endpoints", allEndpoints);
+ perfdata->Set("num_conn_endpoints", Convert::ToDouble(allConnectedEndpoints->GetLength()));
+ perfdata->Set("num_not_conn_endpoints", Convert::ToDouble(allNotConnectedEndpoints->GetLength()));
return std::make_pair(status, perfdata);
}
-void ApiListener::AddAnonymousClient(const ApiClient::Ptr& aclient)
+double ApiListener::CalculateZoneLag(const Endpoint::Ptr& endpoint)
+{
+ double remoteLogPosition = endpoint->GetRemoteLogPosition();
+ double eplag = Utility::GetTime() - remoteLogPosition;
+
+ if ((endpoint->GetSyncing() || !endpoint->IsConnected()) && remoteLogPosition != 0)
+ return eplag;
+
+ return 0;
+}
+
+void ApiListener::AddAnonymousClient(const JsonRpcConnection::Ptr& aclient)
{
ObjectLock olock(this);
m_AnonymousClients.insert(aclient);
}
-void ApiListener::RemoveAnonymousClient(const ApiClient::Ptr& aclient)
+void ApiListener::RemoveAnonymousClient(const JsonRpcConnection::Ptr& aclient)
{
ObjectLock olock(this);
m_AnonymousClients.erase(aclient);
}
-std::set<ApiClient::Ptr> ApiListener::GetAnonymousClients(void) const
+std::set<JsonRpcConnection::Ptr> ApiListener::GetAnonymousClients(void) const
{
ObjectLock olock(this);
return m_AnonymousClients;
-}
\ No newline at end of file
+}
+
+void ApiListener::AddHttpClient(const HttpServerConnection::Ptr& aclient)
+{
+ ObjectLock olock(this);
+ m_HttpClients.insert(aclient);
+}
+
+void ApiListener::RemoveHttpClient(const HttpServerConnection::Ptr& aclient)
+{
+ ObjectLock olock(this);
+ m_HttpClients.erase(aclient);
+}
+
+std::set<HttpServerConnection::Ptr> ApiListener::GetHttpClients(void) const
+{
+ ObjectLock olock(this);
+ return m_HttpClients;
+}
+
+Value ApiListener::HelloAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params)
+{
+ return Empty;
+}