void DelegationComponent::Start(void)
{
m_AllServices = boost::make_shared<ConfigObject::Set>(ConfigObject::GetAllObjects(), ConfigObject::MakeTypePredicate("service"));
- m_AllServices->OnObjectAdded.connect(boost::bind(&DelegationComponent::NewServiceHandler, this, _2));
+/* m_AllServices->OnObjectAdded.connect(boost::bind(&DelegationComponent::NewServiceHandler, this, _2));
m_AllServices->OnObjectCommitted.connect(boost::bind(&DelegationComponent::NewServiceHandler, this, _2));
- m_AllServices->OnObjectRemoved.connect(boost::bind(&DelegationComponent::RemovedServiceHandler, this, _2));
+ m_AllServices->OnObjectRemoved.connect(boost::bind(&DelegationComponent::RemovedServiceHandler, this, _2));*/
m_AllServices->Start();
m_DelegationTimer = boost::make_shared<Timer>();
m_DelegationTimer->SetInterval(30);
m_DelegationTimer->OnTimerExpired.connect(boost::bind(&DelegationComponent::DelegationTimerHandler, this));
m_DelegationTimer->Start();
+ m_DelegationTimer->Reschedule(0);
m_DelegationEndpoint = boost::make_shared<VirtualEndpoint>();
m_DelegationEndpoint->RegisterPublication("checker::AssignService");
mgr->UnregisterEndpoint(m_DelegationEndpoint);
}
-void DelegationComponent::NewServiceHandler(const Service& object)
-{
- AssignService(object);
-}
-
-void DelegationComponent::RemovedServiceHandler(const Service& object)
-{
- RevokeService(object);
-}
-
-void DelegationComponent::AssignService(const Service& service)
+void DelegationComponent::AssignService(const Endpoint::Ptr& checker, const Service& service)
{
RequestMessage request;
request.SetMethod("checker::AssignService");
Application::Log(LogDebug, "delegation", "Trying to delegate service '" + service.GetName() + "'");
- GetEndpointManager()->SendAPIMessage(m_DelegationEndpoint, request,
+ GetEndpointManager()->SendAPIMessage(m_DelegationEndpoint, checker, request,
boost::bind(&DelegationComponent::AssignServiceResponseHandler, this, service, _2, _5));
}
{
if (timedOut) {
Application::Log(LogDebug, "delegation", "Service delegation for service '" + service.GetName() + "' timed out.");
- } else {
- service.SetChecker(sender->GetIdentity());
- Application::Log(LogDebug, "delegation", "Service delegation for service '" + service.GetName() + "' was successful.");
+ service.SetChecker("");
}
}
-void DelegationComponent::RevokeService(const Service& service)
+void DelegationComponent::ClearServices(const Endpoint::Ptr& checker)
{
}
vector<Endpoint::Ptr> candidates;
EndpointManager::Iterator it;
- for (it = GetEndpointManager()->Begin(); it != GetEndpointManager()->End(); it++)
- candidates.push_back(it->second);
+ for (it = GetEndpointManager()->Begin(); it != GetEndpointManager()->End(); it++) {
+ Endpoint::Ptr endpoint = it->second;
+
+ if (!endpoint->HasSubscription("checker::AssignService"))
+ continue;
+
+ candidates.push_back(endpoint);
+ }
return candidates;
}
map<Endpoint::Ptr, int> histogram;
EndpointManager::Iterator eit;
- for (eit = GetEndpointManager()->Begin(); eit != GetEndpointManager()->End(); eit++) {
+ for (eit = GetEndpointManager()->Begin(); eit != GetEndpointManager()->End(); eit++)
histogram[eit->second] = 0;
- }
/* nothing to do if we have no checkers */
if (histogram.size() == 0)
std::random_shuffle(services.begin(), services.end());
- long delegated = 0;
+ int delegated = 0;
/* re-assign services */
vector<Service>::iterator sit;
- for (sit = services.begin(); sit != services.end(); it++) {
+ for (sit = services.begin(); sit != services.end(); sit++) {
Service service = *sit;
string checker = service.GetChecker();
oldEndpoint = GetEndpointManager()->GetEndpointByIdentity(checker);
vector<Endpoint::Ptr> candidates = GetCheckerCandidates(service);
+ std::random_shuffle(candidates.begin(), candidates.end());
+
+ stringstream msgbuf;
+ msgbuf << "Service: " << service.GetName() << ", candidates: " << candidates.size();
+ Application::Log(LogDebug, "delegation", msgbuf.str());
- long avg_services = 0;
+ int avg_services = 0;
vector<Endpoint::Ptr>::iterator cit;
for (cit = candidates.begin(); cit != candidates.end(); cit++)
avg_services += histogram[*cit];
avg_services /= candidates.size();
- long overflow_tolerance = candidates.size() * 2;
+ int overflow_tolerance = candidates.size() * 2;
/* don't re-assign service if the checker is still valid
* and doesn't have too many services */
assert(!service.GetChecker().empty());
}
+ if (delegated > 0) {
+ map<Endpoint::Ptr, int>::iterator hit;
+ for (hit = histogram.begin(); hit != histogram.end(); hit++) {
+ ClearServices(hit->first);
+ }
+
+ for (sit = services.begin(); sit != services.end(); sit++) {
+ string checker = sit->GetChecker();
+ Endpoint::Ptr endpoint = GetEndpointManager()->GetEndpointByIdentity(checker);
+
+ if (!endpoint)
+ continue;
+
+ AssignService(endpoint, *sit);
+ }
+ }
+
stringstream msgbuf;
msgbuf << "Re-delegated " << delegated << " services";
Application::Log(LogInformation, "delegation", msgbuf.str());