From bd610a74b1362ac427bf9d80589ae4db01f75e94 Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Thu, 8 May 2014 15:00:09 +0200 Subject: [PATCH] Clean up reconnect handler. Refs #6107 --- lib/remote/apiclient.cpp | 40 ++++++++++++++++-------- lib/remote/apiclient.h | 4 ++- lib/remote/apilistener.cpp | 59 ++++++++++++++++++++++-------------- lib/remote/messageorigin.cpp | 5 --- lib/remote/messageorigin.h | 1 - 5 files changed, 66 insertions(+), 43 deletions(-) diff --git a/lib/remote/apiclient.cpp b/lib/remote/apiclient.cpp index cc4bb439b..a352c7349 100644 --- a/lib/remote/apiclient.cpp +++ b/lib/remote/apiclient.cpp @@ -39,9 +39,11 @@ INITIALIZE_ONCE(&ApiClient::StaticInitialize); static Value SetLogPositionHandler(const MessageOrigin& origin, const Dictionary::Ptr& params); REGISTER_APIFUNCTION(SetLogPosition, log, &SetLogPositionHandler); -ApiClient::ApiClient(const Endpoint::Ptr& endpoint, const Stream::Ptr& stream, ConnectionRole role) - : m_Endpoint(endpoint), m_Stream(stream), m_Role(role), m_Seen(Utility::GetTime()) -{ } +ApiClient::ApiClient(const String& identity, const Stream::Ptr& stream, ConnectionRole role) + : m_Identity(identity), m_Stream(stream), m_Role(role), m_Seen(Utility::GetTime()) +{ + m_Endpoint = Endpoint::GetByName(identity); +} void ApiClient::StaticInitialize(void) { @@ -57,6 +59,11 @@ void ApiClient::Start(void) thread.detach(); } +String ApiClient::GetIdentity(void) const +{ + return m_Identity; +} + Endpoint::Ptr ApiClient::GetEndpoint(void) const { return m_Endpoint; @@ -81,7 +88,7 @@ void ApiClient::SendMessage(const Dictionary::Ptr& message) m_Seen = Utility::GetTime(); } catch (const std::exception& ex) { std::ostringstream msgbuf; - msgbuf << "Error while sending JSON-RPC message for endpoint '" << m_Endpoint->GetName() << "': " << DiagnosticInformation(ex); + msgbuf << "Error while sending JSON-RPC message for identity '" << m_Identity << "': " << DiagnosticInformation(ex); Log(LogWarning, "remote", msgbuf.str()); Disconnect(); @@ -90,9 +97,11 @@ void ApiClient::SendMessage(const Dictionary::Ptr& message) void ApiClient::Disconnect(void) { - Log(LogWarning, "remote", "API client disconnected for endpoint '" + m_Endpoint->GetName() + "'"); + Log(LogWarning, "remote", "API client disconnected for identity '" + m_Identity + "'"); m_Stream->Close(); - m_Endpoint->RemoveClient(GetSelf()); + + if (m_Endpoint) + m_Endpoint->RemoveClient(GetSelf()); } bool ApiClient::ProcessMessage(void) @@ -105,7 +114,7 @@ bool ApiClient::ProcessMessage(void) if (message->Get("method") != "log::SetLogPosition") m_Seen = Utility::GetTime(); - if (message->Contains("ts")) { + if (m_Endpoint && message->Contains("ts")) { double ts = message->Get("ts"); /* ignore old messages */ @@ -118,14 +127,16 @@ bool ApiClient::ProcessMessage(void) MessageOrigin origin; origin.FromClient = GetSelf(); - if (m_Endpoint->GetZone() != Zone::GetLocalZone()) - origin.FromZone = m_Endpoint->GetZone(); - else - origin.FromZone = Zone::GetByName(message->Get("originZone")); + if (m_Endpoint) { + if (m_Endpoint->GetZone() != Zone::GetLocalZone()) + origin.FromZone = m_Endpoint->GetZone(); + else + origin.FromZone = Zone::GetByName(message->Get("originZone")); + } String method = message->Get("method"); - Log(LogDebug, "remote", "Received '" + method + "' message from '" + m_Endpoint->GetName() + "'"); + Log(LogDebug, "remote", "Received '" + method + "' message from '" + m_Identity + "'"); Dictionary::Ptr resultMessage = make_shared(); @@ -159,7 +170,7 @@ void ApiClient::MessageThreadProc(void) Disconnect(); } catch (const std::exception& ex) { - Log(LogWarning, "remote", "Error while reading JSON-RPC message for endpoint '" + m_Endpoint->GetName() + "': " + DiagnosticInformation(ex)); + Log(LogWarning, "remote", "Error while reading JSON-RPC message for identity '" + m_Identity + "': " + DiagnosticInformation(ex)); } } @@ -193,6 +204,9 @@ Value SetLogPositionHandler(const MessageOrigin& origin, const Dictionary::Ptr& double log_position = params->Get("log_position"); Endpoint::Ptr endpoint = origin.FromClient->GetEndpoint(); + if (!endpoint) + return Empty; + if (log_position > endpoint->GetLocalLogPosition()) endpoint->SetLocalLogPosition(log_position); diff --git a/lib/remote/apiclient.h b/lib/remote/apiclient.h index a9e13210e..945313a95 100644 --- a/lib/remote/apiclient.h +++ b/lib/remote/apiclient.h @@ -46,12 +46,13 @@ class I2_REMOTE_API ApiClient : public Object public: DECLARE_PTR_TYPEDEFS(ApiClient); - ApiClient(const Endpoint::Ptr& endpoint, const Stream::Ptr& stream, ConnectionRole role); + ApiClient(const String& identity, const Stream::Ptr& stream, ConnectionRole role); static void StaticInitialize(void); void Start(void); + String GetIdentity(void) const; Endpoint::Ptr GetEndpoint(void) const; Stream::Ptr GetStream(void) const; ConnectionRole GetRole(void) const; @@ -61,6 +62,7 @@ public: void SendMessage(const Dictionary::Ptr& request); private: + String m_Identity; Endpoint::Ptr m_Endpoint; Stream::Ptr m_Stream; ConnectionRole m_Role; diff --git a/lib/remote/apilistener.cpp b/lib/remote/apilistener.cpp index 4e325b5b2..dbc489cd1 100644 --- a/lib/remote/apilistener.cpp +++ b/lib/remote/apilistener.cpp @@ -201,31 +201,28 @@ void ApiListener::NewClientHandler(const Socket::Ptr& client, ConnectionRole rol shared_ptr cert = tlsStream->GetPeerCertificate(); String identity = GetCertificateCN(cert); - Endpoint::Ptr endpoint = Endpoint::GetByName(identity); + Log(LogInformation, "remote", "New client connection for identity '" + identity + "'"); - if (!endpoint) { - Log(LogInformation, "remote", "New client for unknown endpoint '" + identity + "'"); - return; - } + Endpoint::Ptr endpoint = Endpoint::GetByName(identity); - Log(LogInformation, "remote", "New client connection for identity '" + identity + "'"); + if (endpoint) { + bool need_sync = !endpoint->IsConnected(); - bool need_sync = !endpoint->IsConnected(); + ApiClient::Ptr aclient = make_shared(identity, tlsStream, role); + aclient->Start(); - ApiClient::Ptr aclient = make_shared(endpoint, tlsStream, role); - aclient->Start(); + if (need_sync) { + { + ObjectLock olock(endpoint); - if (need_sync) { - { - ObjectLock olock(endpoint); + endpoint->SetSyncing(true); + } - endpoint->SetSyncing(true); + ReplayLog(aclient); } - ReplayLog(aclient); + endpoint->AddClient(aclient); } - - endpoint->AddClient(aclient); } void ApiListener::ApiTimerHandler(void) @@ -262,19 +259,35 @@ void ApiListener::ApiTimerHandler(void) if (IsMaster()) { Zone::Ptr my_zone = Zone::GetLocalZone(); - BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { - if (endpoint->IsConnected() || endpoint->GetName() == GetIdentity()) + BOOST_FOREACH(const Zone::Ptr& zone, DynamicType::GetObjects()) { + /* only connect to endpoints in a) the same zone b) our parent zone c) immediate child zones */ + if (my_zone != zone && my_zone != zone->GetParent() && zone != my_zone->GetParent()) continue; - if (endpoint->GetHost().IsEmpty() || endpoint->GetPort().IsEmpty()) - continue; + bool connected = false; - Zone::Ptr their_zone = endpoint->GetZone(); + BOOST_FOREACH(const Endpoint::Ptr& endpoint, zone->GetEndpoints()) { + if (endpoint->IsConnected()) { + connected = true; + break; + } + } - if (my_zone != their_zone && my_zone != their_zone->GetParent() && their_zone != my_zone->GetParent()) + /* don't connect to an endpoint if we already have a connection to the zone */ + if (connected) continue; - AddConnection(endpoint->GetHost(), endpoint->GetPort()); + BOOST_FOREACH(const Endpoint::Ptr& endpoint, zone->GetEndpoints()) { + /* don't connect to ourselves */ + if (endpoint->GetName() == GetIdentity()) + continue; + + /* don't try to connect to endpoints which don't have a host and port */ + if (endpoint->GetHost().IsEmpty() || endpoint->GetPort().IsEmpty()) + continue; + + AddConnection(endpoint->GetHost(), endpoint->GetPort()); + } } } diff --git a/lib/remote/messageorigin.cpp b/lib/remote/messageorigin.cpp index 8369e868c..e6a02bf97 100644 --- a/lib/remote/messageorigin.cpp +++ b/lib/remote/messageorigin.cpp @@ -25,8 +25,3 @@ bool MessageOrigin::IsLocal(void) const { return !FromClient; } - -bool MessageOrigin::IsSameZone(void) const -{ - return !FromZone; -} diff --git a/lib/remote/messageorigin.h b/lib/remote/messageorigin.h index d96e64fee..e3ab71358 100644 --- a/lib/remote/messageorigin.h +++ b/lib/remote/messageorigin.h @@ -35,7 +35,6 @@ struct I2_REMOTE_API MessageOrigin ApiClient::Ptr FromClient; bool IsLocal(void) const; - bool IsSameZone(void) const; }; } -- 2.40.0