if (!SSL_CTX_load_verify_locations(sslContext.get(), cakey.c_str(), NULL))
throw OpenSSLException("Could not load public CA key file", ERR_get_error());
+ STACK_OF(X509_NAME) *cert_names;
+
+ cert_names = SSL_load_client_CA_file(cakey.c_str());
+ if (cert_names == NULL)
+ throw OpenSSLException("SSL_load_client_CA_file() failed", ERR_get_error());
+
+ SSL_CTX_set_client_CA_list(sslContext.get(), cert_names);
+
return sslContext;
}
m_CheckerEndpoint = boost::make_shared<VirtualEndpoint>();
m_CheckerEndpoint->RegisterTopicHandler("checker::AssignService",
boost::bind(&CheckerComponent::AssignServiceRequestHandler, this, _2, _3));
- m_CheckerEndpoint->RegisterTopicHandler("checker::RevokeService",
- boost::bind(&CheckerComponent::RevokeServiceRequestHandler, this, _2, _3));
m_CheckerEndpoint->RegisterTopicHandler("checker::ClearServices",
boost::bind(&CheckerComponent::ClearServicesRequestHandler, this, _2, _3));
m_CheckerEndpoint->RegisterPublication("checker::CheckResult");
Application::Log(LogDebug, "checker", "Executing service check for '" + service.GetName() + "'");
+ m_PendingServices.insert(service.GetName());
+
CheckTask::Ptr task = CheckTask::CreateTask(service);
task->Enqueue();
Service service = task->GetService();
+ /* if the service isn't in the set of pending services
+ * it was removed and we need to ignore this check result. */
+ if (m_PendingServices.find(service.GetName()) == m_PendingServices.end())
+ continue;
+
CheckResult result = task->GetResult();
Application::Log(LogDebug, "checker", "Got result for service '" + service.GetName() + "'");
failed++;
service.SetNextCheck(now + service.GetCheckInterval());
+ m_PendingServices.erase(service.GetName());
m_Services.push(service);
}
Application::Log(LogDebug, "checker", "Accepted delegation for service '" + service.GetName() + "'");
- /* force a service check */
- m_CheckTimer->Reschedule(0);
-
- string id;
- if (request.GetID(&id)) {
- ResponseMessage rm;
- rm.SetID(id);
-
- MessagePart result;
- rm.SetResult(result);
- GetEndpointManager()->SendUnicastMessage(m_CheckerEndpoint, sender, rm);
- }
-}
-
-void CheckerComponent::RevokeServiceRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request)
-{
- MessagePart params;
- if (!request.GetParams(¶ms))
- return;
-
- string name;
- if (!params.GetProperty("service", &name))
- return;
-
- vector<Service> services;
-
- while (!m_Services.empty()) {
- Service service = m_Services.top();
-
- if (service.GetName() == name)
- continue;
-
- // TODO: take care of services that are currently being checked
-
- services.push_back(service);
- }
-
- vector<Service>::const_iterator it;
- for (it = services.begin(); it != services.end(); it++)
- m_Services.push(*it);
-
- Application::Log(LogDebug, "checker", "Revoked delegation for service '" + name + "'");
-
string id;
if (request.GetID(&id)) {
ResponseMessage rm;
void CheckerComponent::ClearServicesRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request)
{
Application::Log(LogDebug, "checker", "Clearing service delegations.");
+
+ /* clear the services lists */
m_Services = ServiceQueue();
+ m_PendingServices.clear();
+
+ /* TODO: clear checks we've already sent to the thread pool */
string id;
if (request.GetID(&id)) {
VirtualEndpoint::Ptr m_CheckerEndpoint;
ServiceQueue m_Services;
- set<Service> m_PendingServices;
+ set<string> m_PendingServices;
Timer::Ptr m_CheckTimer;
m_DelegationEndpoint = boost::make_shared<VirtualEndpoint>();
m_DelegationEndpoint->RegisterPublication("checker::AssignService");
- m_DelegationEndpoint->RegisterPublication("checker::RevokeService");
+ m_DelegationEndpoint->RegisterPublication("checker::ClearServices");
GetEndpointManager()->RegisterEndpoint(m_DelegationEndpoint);
}
for (eit = GetEndpointManager()->Begin(); eit != GetEndpointManager()->End(); eit++)
histogram[eit->second] = 0;
- /* nothing to do if we have no checkers */
- if (histogram.size() == 0)
- return;
-
vector<Service> services;
/* build "checker -> service count" histogram */
oldEndpoint = GetEndpointManager()->GetEndpointByIdentity(checker);
vector<Endpoint::Ptr> candidates = GetCheckerCandidates(service);
- std::random_shuffle(candidates.begin(), candidates.end());
-
- stringstream msgbuf;
- msgbuf << "Service: " << service.GetName() << ", candidates: " << candidates.size();
- Application::Log(LogDebug, "delegation", msgbuf.str());
- int avg_services = 0;
+ int avg_services = 0, overflow_tolerance = 0;
vector<Endpoint::Ptr>::iterator cit;
- for (cit = candidates.begin(); cit != candidates.end(); cit++)
- avg_services += histogram[*cit];
- avg_services /= candidates.size();
- int overflow_tolerance = candidates.size() * 2;
+ if (candidates.size() > 0) {
+ std::random_shuffle(candidates.begin(), candidates.end());
+
+ stringstream msgbuf;
+ msgbuf << "Service: " << service.GetName() << ", candidates: " << candidates.size();
+ Application::Log(LogDebug, "delegation", msgbuf.str());
+
+ for (cit = candidates.begin(); cit != candidates.end(); cit++)
+ avg_services += histogram[*cit];
+
+ avg_services /= candidates.size();
+ overflow_tolerance = candidates.size() * 2;
+ }
/* don't re-assign service if the checker is still valid
* and doesn't have too many services */
delegated++;
}
- assert(!service.GetChecker().empty());
+ assert(candidates.size() == 0 || !service.GetChecker().empty());
}
if (delegated > 0) {
*/
void DiscoveryComponent::NewEndpointHandler(const Endpoint::Ptr& endpoint)
{
+ /* ignore local endpoints */
+ if (endpoint->IsLocal())
+ return;
+
/* accept discovery::RegisterComponent messages from any endpoint */
endpoint->RegisterPublication("discovery::RegisterComponent");
time(&(info->LastSeen));
- message.GetNode(&info->Node);
- message.GetService(&info->Service);
+ string node;
+ if (message.GetNode(&node) && !node.empty())
+ info->Node = node;
+
+ string service;
+ if (message.GetService(&service) && !service.empty())
+ info->Service = service;
ConfigObject::Ptr endpointConfig = ConfigObject::GetObject("endpoint", identity);
Dictionary::Ptr roles;
string identity = i->first;
ComponentDiscoveryInfo::Ptr info = i->second;
+ /* there's no need to reconnect to ourself */
+ if (identity == GetEndpointManager()->GetIdentity())
+ continue;
+
curr = i;
i++;
} else {
/* TODO: figure out whether we actually want to connect to this component */
/* try and reconnect to this component */
- endpointManager->AddConnection(info->Node, info->Service);
+ try {
+ if (!info->Node.empty() && !info->Service.empty())
+ endpointManager->AddConnection(info->Node, info->Service);
+ } catch (const std::exception& ex) {
+ stringstream msgbuf;
+ msgbuf << "Exception while trying to reconnect to endpoint '" << endpoint->GetIdentity() << "': " << ex.what();;
+ Application::Log(LogInformation, "discovery", msgbuf.str());
+ }
}
}
}
using namespace icinga;
+/**
+ * Constructor for the EndpointManager class.
+ */
+EndpointManager::EndpointManager(void)
+ : m_NextMessageID(0)
+{
+ m_RequestTimer = boost::make_shared<Timer>();
+ m_RequestTimer->OnTimerExpired.connect(boost::bind(&EndpointManager::RequestTimerHandler, this));
+ m_RequestTimer->SetInterval(5);
+ m_RequestTimer->Start();
+}
+
/**
* Sets the identity of the endpoint manager. This identity is used when
* connecting to remote peers.
*/
void EndpointManager::RegisterEndpoint(Endpoint::Ptr endpoint)
{
- if (!endpoint->IsLocal() && endpoint->GetIdentity() != "")
- throw invalid_argument("Identity must be empty.");
-
endpoint->SetEndpointManager(GetSelf());
UnregisterEndpoint(endpoint);
pr.Timeout = time(NULL) + timeout;
m_Requests[id] = pr;
- RescheduleRequestTimer();
if (!recipient)
SendAnycastMessage(sender, message);
return a.second.Timeout < b.second.Timeout;
}
-void EndpointManager::RescheduleRequestTimer(void)
-{
- map<string, PendingRequest>::iterator it;
- it = min_element(m_Requests.begin(), m_Requests.end(),
- &EndpointManager::RequestTimeoutLessComparer);
-
- if (!m_RequestTimer) {
- m_RequestTimer = boost::make_shared<Timer>();
- m_RequestTimer->OnTimerExpired.connect(boost::bind(&EndpointManager::RequestTimerHandler, this));
- }
-
- if (it != m_Requests.end()) {
- time_t now;
- time(&now);
-
- time_t next_timeout = (it->second.Timeout < now) ? now : it->second.Timeout;
- m_RequestTimer->SetInterval(next_timeout - now);
- m_RequestTimer->Start();
- } else {
- m_RequestTimer->Stop();
- }
-}
-
void EndpointManager::RequestTimerHandler(void)
{
map<string, PendingRequest>::iterator it;
break;
}
}
-
- RescheduleRequestTimer();
}
void EndpointManager::ProcessResponseMessage(const Endpoint::Ptr& sender, const ResponseMessage& message)
it->second.Callback(GetSelf(), sender, it->second.Request, message, false);
m_Requests.erase(it);
- RescheduleRequestTimer();
}
EndpointManager::Iterator EndpointManager::Begin(void)
typedef map<string, Endpoint::Ptr>::iterator Iterator;
- EndpointManager(void)
- : m_NextMessageID(0)
- { }
+ EndpointManager(void);
void SetIdentity(string identity);
string GetIdentity(void) const;
void UnregisterServer(JsonRpcServer::Ptr server);
static bool RequestTimeoutLessComparer(const pair<string, PendingRequest>& a, const pair<string, PendingRequest>& b);
- void RescheduleRequestTimer(void);
void RequestTimerHandler(void);
void NewClientHandler(const TcpClient::Ptr& client);
Application::Log(LogWarning, "jsonrpc", message.str());
}
-void JsonRpcEndpoint::VerifyCertificateHandler(bool& valid, const shared_ptr<X509>& certificate)
+void JsonRpcEndpoint::VerifyCertificateHandler(bool *valid, const shared_ptr<X509>& certificate)
{
- if (certificate && valid) {
+ if (certificate && *valid) {
string identity = Utility::GetCertificateCN(certificate);
if (GetIdentity().empty() && !identity.empty()) {
void NewMessageHandler(const MessagePart& message);
void ClientClosedHandler(void);
void ClientErrorHandler(const std::exception& ex);
- void VerifyCertificateHandler(bool& valid, const shared_ptr<X509>& certificate);
+ void VerifyCertificateHandler(bool *valid, const shared_ptr<X509>& certificate);
};
}