static Value SetLogPositionHandler(const MessageOrigin& origin, const Dictionary::Ptr& params);
REGISTER_APIFUNCTION(SetLogPosition, log, &SetLogPositionHandler);
-ApiClient::ApiClient(const Endpoint::Ptr& endpoint, const Stream::Ptr& stream, ConnectionRole role)
- : m_Endpoint(endpoint), m_Stream(stream), m_Role(role), m_Seen(Utility::GetTime())
-{ }
+ApiClient::ApiClient(const String& identity, const Stream::Ptr& stream, ConnectionRole role)
+ : m_Identity(identity), m_Stream(stream), m_Role(role), m_Seen(Utility::GetTime())
+{
+ m_Endpoint = Endpoint::GetByName(identity);
+}
void ApiClient::StaticInitialize(void)
{
thread.detach();
}
+String ApiClient::GetIdentity(void) const
+{
+ return m_Identity;
+}
+
Endpoint::Ptr ApiClient::GetEndpoint(void) const
{
return m_Endpoint;
m_Seen = Utility::GetTime();
} catch (const std::exception& ex) {
std::ostringstream msgbuf;
- msgbuf << "Error while sending JSON-RPC message for endpoint '" << m_Endpoint->GetName() << "': " << DiagnosticInformation(ex);
+ msgbuf << "Error while sending JSON-RPC message for identity '" << m_Identity << "': " << DiagnosticInformation(ex);
Log(LogWarning, "remote", msgbuf.str());
Disconnect();
void ApiClient::Disconnect(void)
{
- Log(LogWarning, "remote", "API client disconnected for endpoint '" + m_Endpoint->GetName() + "'");
+ Log(LogWarning, "remote", "API client disconnected for identity '" + m_Identity + "'");
m_Stream->Close();
- m_Endpoint->RemoveClient(GetSelf());
+
+ if (m_Endpoint)
+ m_Endpoint->RemoveClient(GetSelf());
}
bool ApiClient::ProcessMessage(void)
if (message->Get("method") != "log::SetLogPosition")
m_Seen = Utility::GetTime();
- if (message->Contains("ts")) {
+ if (m_Endpoint && message->Contains("ts")) {
double ts = message->Get("ts");
/* ignore old messages */
MessageOrigin origin;
origin.FromClient = GetSelf();
- if (m_Endpoint->GetZone() != Zone::GetLocalZone())
- origin.FromZone = m_Endpoint->GetZone();
- else
- origin.FromZone = Zone::GetByName(message->Get("originZone"));
+ if (m_Endpoint) {
+ if (m_Endpoint->GetZone() != Zone::GetLocalZone())
+ origin.FromZone = m_Endpoint->GetZone();
+ else
+ origin.FromZone = Zone::GetByName(message->Get("originZone"));
+ }
String method = message->Get("method");
- Log(LogDebug, "remote", "Received '" + method + "' message from '" + m_Endpoint->GetName() + "'");
+ Log(LogDebug, "remote", "Received '" + method + "' message from '" + m_Identity + "'");
Dictionary::Ptr resultMessage = make_shared<Dictionary>();
Disconnect();
} catch (const std::exception& ex) {
- Log(LogWarning, "remote", "Error while reading JSON-RPC message for endpoint '" + m_Endpoint->GetName() + "': " + DiagnosticInformation(ex));
+ Log(LogWarning, "remote", "Error while reading JSON-RPC message for identity '" + m_Identity + "': " + DiagnosticInformation(ex));
}
}
double log_position = params->Get("log_position");
Endpoint::Ptr endpoint = origin.FromClient->GetEndpoint();
+ if (!endpoint)
+ return Empty;
+
if (log_position > endpoint->GetLocalLogPosition())
endpoint->SetLocalLogPosition(log_position);
public:
DECLARE_PTR_TYPEDEFS(ApiClient);
- ApiClient(const Endpoint::Ptr& endpoint, const Stream::Ptr& stream, ConnectionRole role);
+ ApiClient(const String& identity, const Stream::Ptr& stream, ConnectionRole role);
static void StaticInitialize(void);
void Start(void);
+ String GetIdentity(void) const;
Endpoint::Ptr GetEndpoint(void) const;
Stream::Ptr GetStream(void) const;
ConnectionRole GetRole(void) const;
void SendMessage(const Dictionary::Ptr& request);
private:
+ String m_Identity;
Endpoint::Ptr m_Endpoint;
Stream::Ptr m_Stream;
ConnectionRole m_Role;
shared_ptr<X509> cert = tlsStream->GetPeerCertificate();
String identity = GetCertificateCN(cert);
- Endpoint::Ptr endpoint = Endpoint::GetByName(identity);
+ Log(LogInformation, "remote", "New client connection for identity '" + identity + "'");
- if (!endpoint) {
- Log(LogInformation, "remote", "New client for unknown endpoint '" + identity + "'");
- return;
- }
+ Endpoint::Ptr endpoint = Endpoint::GetByName(identity);
- Log(LogInformation, "remote", "New client connection for identity '" + identity + "'");
+ if (endpoint) {
+ bool need_sync = !endpoint->IsConnected();
- bool need_sync = !endpoint->IsConnected();
+ ApiClient::Ptr aclient = make_shared<ApiClient>(identity, tlsStream, role);
+ aclient->Start();
- ApiClient::Ptr aclient = make_shared<ApiClient>(endpoint, tlsStream, role);
- aclient->Start();
+ if (need_sync) {
+ {
+ ObjectLock olock(endpoint);
- if (need_sync) {
- {
- ObjectLock olock(endpoint);
+ endpoint->SetSyncing(true);
+ }
- endpoint->SetSyncing(true);
+ ReplayLog(aclient);
}
- ReplayLog(aclient);
+ endpoint->AddClient(aclient);
}
-
- endpoint->AddClient(aclient);
}
void ApiListener::ApiTimerHandler(void)
if (IsMaster()) {
Zone::Ptr my_zone = Zone::GetLocalZone();
- BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
- if (endpoint->IsConnected() || endpoint->GetName() == GetIdentity())
+ BOOST_FOREACH(const Zone::Ptr& zone, DynamicType::GetObjects<Zone>()) {
+ /* only connect to endpoints in a) the same zone b) our parent zone c) immediate child zones */
+ if (my_zone != zone && my_zone != zone->GetParent() && zone != my_zone->GetParent())
continue;
- if (endpoint->GetHost().IsEmpty() || endpoint->GetPort().IsEmpty())
- continue;
+ bool connected = false;
- Zone::Ptr their_zone = endpoint->GetZone();
+ BOOST_FOREACH(const Endpoint::Ptr& endpoint, zone->GetEndpoints()) {
+ if (endpoint->IsConnected()) {
+ connected = true;
+ break;
+ }
+ }
- if (my_zone != their_zone && my_zone != their_zone->GetParent() && their_zone != my_zone->GetParent())
+ /* don't connect to an endpoint if we already have a connection to the zone */
+ if (connected)
continue;
- AddConnection(endpoint->GetHost(), endpoint->GetPort());
+ BOOST_FOREACH(const Endpoint::Ptr& endpoint, zone->GetEndpoints()) {
+ /* don't connect to ourselves */
+ if (endpoint->GetName() == GetIdentity())
+ continue;
+
+ /* don't try to connect to endpoints which don't have a host and port */
+ if (endpoint->GetHost().IsEmpty() || endpoint->GetPort().IsEmpty())
+ continue;
+
+ AddConnection(endpoint->GetHost(), endpoint->GetPort());
+ }
}
}