void DelegationComponent::Start(void)
{
m_AllServices = boost::make_shared<ConfigObject::Set>(ConfigObject::GetAllObjects(), ConfigObject::MakeTypePredicate("service"));
-/* m_AllServices->OnObjectAdded.connect(boost::bind(&DelegationComponent::NewServiceHandler, this, _2));
- m_AllServices->OnObjectCommitted.connect(boost::bind(&DelegationComponent::NewServiceHandler, this, _2));
- m_AllServices->OnObjectRemoved.connect(boost::bind(&DelegationComponent::RemovedServiceHandler, this, _2));*/
m_AllServices->Start();
m_DelegationTimer = boost::make_shared<Timer>();
m_DelegationEndpoint->RegisterPublication("checker::AssignService");
m_DelegationEndpoint->RegisterPublication("checker::ClearServices");
GetEndpointManager()->RegisterEndpoint(m_DelegationEndpoint);
+
+ GetEndpointManager()->OnNewEndpoint.connect(bind(&DelegationComponent::NewEndpointHandler, this, _2));
}
void DelegationComponent::Stop(void)
return candidates;
}
+void DelegationComponent::NewEndpointHandler(const Endpoint::Ptr& endpoint)
+{
+ endpoint->OnSessionEstablished.connect(bind(&DelegationComponent::SessionEstablishedHandler, this, _1));
+}
+void DelegationComponent::SessionEstablishedHandler(const Endpoint::Ptr& endpoint)
+{
+ stringstream msgbuf;
+ msgbuf << "Clearing assigned services for endpoint '" << endpoint->GetIdentity() << "'";
+ Application::Log(LogInformation, "delegation", msgbuf.str());
+
+ /* locally clear checker for all services that previously belonged to this endpoint */
+ ConfigObject::Set::Iterator it;
+ for (it = m_AllServices->Begin(); it != m_AllServices->End(); it++) {
+ Service service = *it;
+
+ if (service.GetChecker() == endpoint->GetIdentity())
+ service.SetChecker("");
+ }
+
+ /* remotely clear services for this endpoint */
+ ClearServices(endpoint);
+}
+
void DelegationComponent::DelegationTimerHandler(void)
{
map<Endpoint::Ptr, int> histogram;
}
if (delegated > 0) {
- // TODO: send clear message when session is established
- // TODO: clear local assignments when session is lost
- need_clear = true; /* remove this once clear messages are properly sent */
if (need_clear) {
map<Endpoint::Ptr, int>::iterator hit;
for (hit = histogram.begin(); hit != histogram.end(); hit++) {
}
stringstream msgbuf;
- msgbuf << "Re-delegated " << delegated << " services";
+ msgbuf << "Updated delegations for " << delegated << " services";
Application::Log(LogInformation, "delegation", msgbuf.str());
}
*/
void DiscoveryComponent::NewEndpointHandler(const Endpoint::Ptr& endpoint)
{
- /* ignore local endpoints */
- if (endpoint->IsLocal())
+ /* immediately finish session setup for local endpoints */
+ if (endpoint->IsLocal()) {
+ endpoint->OnSessionEstablished(endpoint);
return;
+ }
/* accept discovery::RegisterComponent messages from any endpoint */
endpoint->RegisterPublication("discovery::RegisterComponent");
SendDiscoveryMessage("discovery::NewComponent", identity, Endpoint::Ptr());
- /* don't send a welcome message for discovery::RegisterComponent messages */
- if (endpoint && trusted)
+ /* don't send a welcome message for discovery::NewComponent messages */
+ if (endpoint && !trusted)
FinishDiscoverySetup(endpoint);
}
if (identity == GetEndpointManager()->GetIdentity())
continue;
+ /* for explicitly-configured upstream endpoints
+ * we prefer to use the node/service from the
+ * config object - which is what the for loop above does */
+ if (ConfigObject::GetObject("endpoint", identity))
+ continue;
+
curr = i;
i++;