m_DiscoveryEndpoint->RegisterMethodSource("discovery::RegisterComponent");
m_DiscoveryEndpoint->RegisterMethodHandler("discovery::NewComponent",
bind_weak(&DiscoveryComponent::NewComponentMessageHandler, shared_from_this()));
+ m_DiscoveryEndpoint->RegisterMethodHandler("discovery::Welcome",
+ bind_weak(&DiscoveryComponent::WelcomeMessageHandler, shared_from_this()));
GetEndpointManager()->ForeachEndpoint(bind(&DiscoveryComponent::NewEndpointHandler, this, _1));
GetEndpointManager()->OnNewEndpoint += bind_weak(&DiscoveryComponent::NewEndpointHandler, shared_from_this());
neea.Endpoint->RegisterMethodSource("discovery::RegisterComponent");
}
+ neea.Endpoint->RegisterMethodSource("discovery::Welcome");
+
/* TODO: implement message broker authorisation */
neea.Endpoint->RegisterMethodSource("discovery::NewComponent");
return 0;
}
- // TODO: send discovery::Welcome message
- // TODO: add subscriptions/provides to this endpoint
+ FinishDiscoverySetup(endpoint);
+
+ return 0;
+}
+
+int DiscoveryComponent::WelcomeMessageHandler(const NewRequestEventArgs& nrea)
+{
+ Endpoint::Ptr endpoint = nrea.Sender;
+
+ if (endpoint->GetHandshakeCounter() >= 2)
+ return 0;
+
+ endpoint->IncrementHandshakeCounter();
+
+ if (endpoint->GetHandshakeCounter() >= 2) {
+ EventArgs ea;
+ ea.Source = shared_from_this();
+ endpoint->OnSessionEstablished(ea);
+ }
+
return 0;
}
+void DiscoveryComponent::FinishDiscoverySetup(Endpoint::Ptr endpoint)
+{
+ if (endpoint->GetHandshakeCounter() >= 2)
+ return;
+
+ // we assume the other component _always_ wants
+ // discovery::Welcome messages from us
+ endpoint->RegisterMethodSink("discovery::Welcome");
+ JsonRpcRequest request;
+ request.SetMethod("discovery::Welcome");
+ GetEndpointManager()->SendUnicastRequest(m_DiscoveryEndpoint, endpoint, request);
+
+ ComponentDiscoveryInfo::Ptr info;
+
+ if (GetComponentDiscoveryInfo(endpoint->GetIdentity(), &info)) {
+ set<string>::iterator i;
+ for (i = info->PublishedMethods.begin(); i != info->PublishedMethods.end(); i++)
+ endpoint->RegisterMethodSource(*i);
+
+ for (i = info->SubscribedMethods.begin(); i != info->SubscribedMethods.end(); i++)
+ endpoint->RegisterMethodSink(*i);
+ }
+
+ endpoint->IncrementHandshakeCounter();
+
+ if (endpoint->GetHandshakeCounter() >= 2) {
+ EventArgs ea;
+ ea.Source = shared_from_this();
+ endpoint->OnSessionEstablished(ea);
+ }
+}
+
void DiscoveryComponent::SendDiscoveryMessage(string method, string identity, Endpoint::Ptr recipient)
{
JsonRpcRequest request;
m_Components[identity] = info;
- SendDiscoveryMessage("discovery::NewComponent", identity, Endpoint::Ptr());
+ if (IsBroker())
+ SendDiscoveryMessage("discovery::NewComponent", identity, Endpoint::Ptr());
+
+ Endpoint::Ptr endpoint = GetEndpointManager()->GetEndpointByIdentity(identity);
+ if (endpoint)
+ FinishDiscoverySetup(endpoint);
}
int DiscoveryComponent::NewComponentMessageHandler(const NewRequestEventArgs& nrea)
map<string, ComponentDiscoveryInfo::Ptr>::iterator i;
for (i = m_Components.begin(); i != m_Components.end(); i++) {
- if (endpointManager->HasConnectedEndpoint(i->first))
+ Endpoint::Ptr endpoint = endpointManager->GetEndpointByIdentity(i->first);
+ if (endpoint)
continue;
ComponentDiscoveryInfo::Ptr info = i->second;
string m_Identity;
set<string> m_MethodSinks;
set<string> m_MethodSources;
+ unsigned short m_HandshakeCounter;
weak_ptr<EndpointManager> m_EndpointManager;
typedef shared_ptr<Endpoint> Ptr;
typedef weak_ptr<Endpoint> WeakPtr;
+ Endpoint(void);
+
virtual string GetAddress(void) const = 0;
string GetIdentity(void) const;
void SetIdentity(string identity);
bool HasIdentity(void) const;
+ void IncrementHandshakeCounter();
+ unsigned short GetHandshakeCounter(void) const;
+
shared_ptr<EndpointManager> GetEndpointManager(void) const;
void SetEndpointManager(weak_ptr<EndpointManager> manager);