Service::OnNextCheckChanged.connect(bind(&CheckerComponent::NextCheckChangedHandler, this, _1));
DynamicObject::OnUnregistered.connect(bind(&CheckerComponent::ObjectRemovedHandler, this, _1));
- m_CheckTimer = boost::make_shared<Timer>();
- m_CheckTimer->SetInterval(0.1);
- m_CheckTimer->OnTimerExpired.connect(boost::bind(&CheckerComponent::CheckTimerHandler, this));
- m_CheckTimer->Start();
+ boost::thread thread(boost::bind(&CheckerComponent::CheckThreadProc, this));
+ thread.detach();
m_ResultTimer = boost::make_shared<Timer>();
m_ResultTimer->SetInterval(5);
m_Endpoint->Unregister();
}
-void CheckerComponent::CheckTimerHandler(void)
+void CheckerComponent::CheckThreadProc(void)
{
- recursive_mutex::scoped_lock lock(Application::GetMutex());
-
- double now = Utility::GetTime();
- long tasks = 0;
-
- int missedServices = 0, missedChecks = 0;
-
for (;;) {
Service::Ptr service;
typedef nth_index<ServiceSet, 1>::type CheckTimeView;
CheckTimeView& idx = boost::get<1>(m_IdleServices);
- if (idx.begin() == idx.end())
- break;
+ while (idx.begin() == idx.end())
+ m_CV.wait(lock);
CheckTimeView::iterator it = idx.begin();
service = it->lock();
idx.erase(it);
continue;
}
+ }
- {
- ObjectLock olock(service);
+ double wait;
- if (service->GetNextCheck() > now)
- break;
- }
+ {
+ ObjectLock olock(service);
+ wait = service->GetNextCheck() - Utility::GetTime();
+ }
- idx.erase(it);
+ if (wait > 0) {
+ /* Make sure the service we just examined can be destroyed while we're waiting. */
+ service.reset();
+
+ /* Wait for the next check. */
+ boost::mutex::scoped_lock lock(m_Mutex);
+ m_CV.timed_wait(lock, boost::posix_time::milliseconds(wait * 1000));
+
+ continue;
+ }
+
+ {
+ boost::mutex::scoped_lock lock(m_Mutex);
+ m_IdleServices.erase(service);
}
- ObjectLock olock(service);
+ ObjectLock olock(service); /* also required for the key extractor */
/* reschedule the service if checks are currently disabled
* for it and this is not a forced check */
service->SetForceNextCheck(false);
- Dictionary::Ptr cr = service->GetLastCheckResult();
-
- if (cr) {
- double lastCheck = cr->Get("execution_end");
- int missed = (Utility::GetTime() - lastCheck) / service->GetCheckInterval() - 1;
-
- if (missed > 0 && !service->GetFirstCheck()) {
- missedChecks += missed;
- missedServices++;
- }
- }
-
service->SetFirstCheck(false);
Logger::Write(LogDebug, "checker", "Executing service check for '" + service->GetName() + "'");
- m_IdleServices.erase(service);
- m_PendingServices.insert(service);
+ {
+ boost::mutex::scoped_lock lock(m_Mutex);
+ m_IdleServices.erase(service);
+ m_PendingServices.insert(service);
+ }
try {
service->BeginExecuteCheck(boost::bind(&CheckerComponent::CheckCompletedHandler, this, service));
} catch (const exception& ex) {
Logger::Write(LogCritical, "checker", "Exception occured while checking service '" + service->GetName() + "': " + diagnostic_information(ex));
}
-
- tasks++;
- }
-
- if (missedServices > 0) {
- stringstream msgbuf;
- msgbuf << "Missed " << missedChecks << " checks for " << missedServices << " services";;
- Logger::Write(LogWarning, "checker", msgbuf.str());
- }
-
- if (tasks > 0) {
- stringstream msgbuf;
- msgbuf << "CheckTimerHandler: created " << tasks << " task(s)";
- Logger::Write(LogDebug, "checker", msgbuf.str());
}
-
- RescheduleCheckTimer();
}
void CheckerComponent::CheckCompletedHandler(const Service::Ptr& service)
{
+ ObjectLock olock(service); /* required for the key extractor */
+
{
boost::mutex::scoped_lock lock(m_Mutex);
if (it != m_PendingServices.end()) {
m_PendingServices.erase(it);
m_IdleServices.insert(service);
+ m_CV.notify_all();
}
}
- RescheduleCheckTimer();
-
- {
- ObjectLock olock(service);
- Logger::Write(LogDebug, "checker", "Check finished for service '" + service->GetName() + "'");
- }
+ Logger::Write(LogDebug, "checker", "Check finished for service '" + service->GetName() + "'");
}
void CheckerComponent::ResultTimerHandler(void)
void CheckerComponent::CheckerChangedHandler(const Service::Ptr& service)
{
- String checker;
-
- {
- ObjectLock olock(service);
- checker = service->GetChecker();
- }
+ ObjectLock olock(service); /* also required for the key extractor */
+ String checker = service->GetChecker();
if (checker == EndpointManager::GetInstance()->GetIdentity() || checker == m_Endpoint->GetName()) {
boost::mutex::scoped_lock lock(m_Mutex);
return;
m_IdleServices.insert(service);
+ m_CV.notify_all();
} else {
boost::mutex::scoped_lock lock(m_Mutex);
m_IdleServices.erase(service);
m_PendingServices.erase(service);
+ m_CV.notify_all();
}
}
void CheckerComponent::NextCheckChangedHandler(const Service::Ptr& service)
{
{
+ ObjectLock olock(service); /* required for the key extractor */
boost::mutex::scoped_lock lock(m_Mutex);
/* remove and re-insert the service from the set in order to force an index update */
if (it == idx.end())
return;
- idx.erase(it);
- idx.insert(service);
+ idx.replace(it, service);
+ m_CV.notify_all();
}
-
- RescheduleCheckTimer();
}
void CheckerComponent::ObjectRemovedHandler(const DynamicObject::Ptr& object)
m_IdleServices.erase(service);
m_PendingServices.erase(service);
+ m_CV.notify_all();
}
}
-
-void CheckerComponent::RescheduleCheckTimer(void)
-{
- Service::Ptr service;
-
- {
- boost::mutex::scoped_lock lock(m_Mutex);
-
- if (m_IdleServices.empty())
- return;
-
- typedef nth_index<ServiceSet, 1>::type CheckTimeView;
- CheckTimeView& idx = boost::get<1>(m_IdleServices);
-
- do {
- CheckTimeView::iterator it = idx.begin();
-
- if (it == idx.end())
- return;
-
- service = it->lock();
-
- if (!service)
- idx.erase(it);
- } while (!service);
- }
-
- ObjectLock olock(service);
- m_CheckTimer->Reschedule(service->GetNextCheck());
-}
{
typedef double result_type;
+ /**
+ * @threadsafety Caller must hold the mutex for the service.
+ */
double operator()(const Service::WeakPtr& wservice)
{
Service::Ptr service = wservice.lock();
if (!service)
return 0;
- {
- ObjectLock olock(service);
- return service->GetNextCheck();
- }
+ return service->GetNextCheck();
}
};
Endpoint::Ptr m_Endpoint;
boost::mutex m_Mutex;
+ boost::condition_variable m_CV;
ServiceSet m_IdleServices;
ServiceSet m_PendingServices;
- Timer::Ptr m_CheckTimer;
-
Timer::Ptr m_ResultTimer;
- void CheckTimerHandler(void);
+ void CheckThreadProc(void);
void ResultTimerHandler(void);
void CheckCompletedHandler(const Service::Ptr& service);
String command = line;
- {
- recursive_mutex::scoped_lock lock(Application::GetMutex());
-
- ProcessCommand(command);
- }
+ ProcessCommand(command);
}
fclose(fp);
void CompatComponent::DumpComments(ofstream& fp, const Service::Ptr& owner, CompatObjectType type)
{
+ ObjectLock olock(owner);
+
Service::Ptr service;
Host::Ptr host;
Dictionary::Ptr comments = owner->GetComments();
void CompatComponent::DumpDowntimes(ofstream& fp, const Service::Ptr& owner, CompatObjectType type)
{
+ ObjectLock olock(owner);
+
Dictionary::Ptr downtimes = owner->GetDowntimes();
if (!downtimes)
void CompatComponent::DumpHostStatus(ofstream& fp, const Host::Ptr& host)
{
+ ObjectLock olock(host);
+
int state;
if (!host->IsReachable())
state = 2; /* unreachable */
void CompatComponent::DumpHostObject(ofstream& fp, const Host::Ptr& host)
{
+ ObjectLock olock(host);
+
fp << "define host {" << "\n"
<< "\t" << "host_name" << "\t" << host->GetName() << "\n"
<< "\t" << "display_name" << "\t" << host->GetDisplayName() << "\n"
void CompatComponent::DumpServiceStatusAttrs(ofstream& fp, const Service::Ptr& service, CompatObjectType type)
{
+ ObjectLock olock(service);
+
String output;
String perfdata;
double schedule_start = -1, schedule_end = -1;
void CompatComponent::DumpServiceStatus(ofstream& fp, const Service::Ptr& service)
{
+ ObjectLock olock(service);
+
fp << "servicestatus {" << "\n"
<< "\t" << "host_name=" << service->GetHost()->GetName() << "\n"
<< "\t" << "service_description=" << service->GetShortName() << "\n";
void CompatComponent::DumpServiceObject(ofstream& fp, const Service::Ptr& service)
{
+ ObjectLock olock(service);
+
fp << "define service {" << "\n"
<< "\t" << "host_name" << "\t" << service->GetHost()->GetName() << "\n"
<< "\t" << "service_description" << "\t" << service->GetShortName() << "\n"
<< "\n";
BOOST_FOREACH(const Service::Ptr& parent, service->GetParentServices()) {
+ ObjectLock plock(parent);
+
fp << "define servicedependency {" << "\n"
<< "\t" << "dependent_host_name" << "\t" << service->GetHost()->GetName() << "\n"
<< "\t" << "dependent_service_description" << "\t" << service->GetShortName() << "\n"
*/
void CompatComponent::StatusTimerHandler(void)
{
- recursive_mutex::scoped_lock lock(Application::GetMutex());
-
Logger::Write(LogInformation, "compat", "Writing compat status information");
String statuspath = GetStatusPath();
<< "# This file is auto-generated. Do not modify this file." << "\n"
<< "\n";
- DynamicObject::Ptr object;
- BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("Host")->GetObjects()) {
- const Host::Ptr& host = static_pointer_cast<Host>(object);
+ {
+ DynamicType::Ptr dt = DynamicType::GetByName("Host");
+ ObjectLock dlock(dt);
+
+ DynamicObject::Ptr object;
+ BOOST_FOREACH(tie(tuples::ignore, object), dt->GetObjects()) {
+ Host::Ptr host = static_pointer_cast<Host>(object);
- DumpHostStatus(statusfp, host);
- DumpHostObject(objectfp, host);
+ DumpHostStatus(statusfp, host);
+ DumpHostObject(objectfp, host);
+ }
}
- BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("HostGroup")->GetObjects()) {
- const HostGroup::Ptr& hg = static_pointer_cast<HostGroup>(object);
+ {
+ DynamicType::Ptr dt = DynamicType::GetByName("Host");
+ ObjectLock dlock(dt);
+
+ DynamicObject::Ptr object;
+ BOOST_FOREACH(tie(tuples::ignore, object), dt->GetObjects()) {
+ HostGroup::Ptr hg = static_pointer_cast<HostGroup>(object);
+ ObjectLock olock(hg);
- objectfp << "define hostgroup {" << "\n"
- << "\t" << "hostgroup_name" << "\t" << hg->GetName() << "\n"
- << "\t" << "notes_url" << "\t" << hg->GetNotesUrl() << "\n"
- << "\t" << "action_url" << "\t" << hg->GetActionUrl() << "\n";
+ objectfp << "define hostgroup {" << "\n"
+ << "\t" << "hostgroup_name" << "\t" << hg->GetName() << "\n"
+ << "\t" << "notes_url" << "\t" << hg->GetNotesUrl() << "\n"
+ << "\t" << "action_url" << "\t" << hg->GetActionUrl() << "\n";
- objectfp << "\t" << "members" << "\t";
- DumpNameList(objectfp, hg->GetMembers());
- objectfp << "\n"
- << "}" << "\n";
+ objectfp << "\t" << "members" << "\t";
+ DumpNameList(objectfp, hg->GetMembers());
+ objectfp << "\n"
+ << "}" << "\n";
+ }
}
- BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("Service")->GetObjects()) {
- const Service::Ptr& service = static_pointer_cast<Service>(object);
+ {
+ DynamicType::Ptr dt = DynamicType::GetByName("Service");
+ ObjectLock dlock(dt);
- DumpServiceStatus(statusfp, service);
- DumpServiceObject(objectfp, service);
+ DynamicObject::Ptr object;
+ BOOST_FOREACH(tie(tuples::ignore, object), dt->GetObjects()) {
+ Service::Ptr service = static_pointer_cast<Service>(object);
+
+ DumpServiceStatus(statusfp, service);
+ DumpServiceObject(objectfp, service);
+ }
}
- BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("ServiceGroup")->GetObjects()) {
- const ServiceGroup::Ptr& sg = static_pointer_cast<ServiceGroup>(object);
+ {
+ DynamicType::Ptr dt = DynamicType::GetByName("ServiceGroup");
+ ObjectLock dlock(dt);
- objectfp << "define servicegroup {" << "\n"
- << "\t" << "servicegroup_name" << "\t" << sg->GetName() << "\n"
- << "\t" << "notes_url" << "\t" << sg->GetNotesUrl() << "\n"
- << "\t" << "action_url" << "\t" << sg->GetActionUrl() << "\n";
+ DynamicObject::Ptr object;
+ BOOST_FOREACH(tie(tuples::ignore, object), dt->GetObjects()) {
+ ServiceGroup::Ptr sg = static_pointer_cast<ServiceGroup>(object);
+ ObjectLock olock(sg);
- objectfp << "\t" << "members" << "\t";
+ objectfp << "define servicegroup {" << "\n"
+ << "\t" << "servicegroup_name" << "\t" << sg->GetName() << "\n"
+ << "\t" << "notes_url" << "\t" << sg->GetNotesUrl() << "\n"
+ << "\t" << "action_url" << "\t" << sg->GetActionUrl() << "\n";
- vector<String> sglist;
- BOOST_FOREACH(const Service::Ptr& service, sg->GetMembers()) {
- sglist.push_back(service->GetHost()->GetName());
- sglist.push_back(service->GetShortName());
- }
+ objectfp << "\t" << "members" << "\t";
- DumpStringList(objectfp, sglist);
+ vector<String> sglist;
+ BOOST_FOREACH(const Service::Ptr& service, sg->GetMembers()) {
+ ObjectLock slock(service);
+ Host::Ptr host = service->GetHost();
- objectfp << "\n"
- << "}" << "\n";
+ ObjectLock hlock(host);
+ sglist.push_back(host->GetName());
+
+ sglist.push_back(service->GetShortName());
+ }
+
+ DumpStringList(objectfp, sglist);
+
+ objectfp << "\n"
+ << "}" << "\n";
+ }
}
statusfp.close();
else
first = false;
+ ObjectLock olock(*it);
fp << (*it)->GetName();
}
}
return (endpoint->HasSubscription("checker"));
}
-vector<Endpoint::Ptr> DelegationComponent::GetCheckerCandidates(const Service::Ptr& service) const
+set<Endpoint::Ptr> DelegationComponent::GetCheckerCandidates(const Service::Ptr& service) const
{
- vector<Endpoint::Ptr> candidates;
+ set<Endpoint::Ptr> candidates;
+
+ DynamicType::Ptr dt = DynamicType::GetByName("Endpoint");
+ ObjectLock dlock(dt);
DynamicObject::Ptr object;
- BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("Endpoint")->GetObjects()) {
+ BOOST_FOREACH(tie(tuples::ignore, object), dt->GetObjects()) {
Endpoint::Ptr endpoint = dynamic_pointer_cast<Endpoint>(object);
+ ObjectLock olock(endpoint);
String myIdentity = EndpointManager::GetInstance()->GetIdentity();
if (!service->IsAllowedChecker(endpoint->GetName()))
continue;
- candidates.push_back(endpoint);
+ candidates.insert(endpoint);
}
return candidates;
void DelegationComponent::DelegationTimerHandler(void)
{
- recursive_mutex::scoped_lock lock(Application::GetMutex());
-
map<Endpoint::Ptr, int> histogram;
- DynamicObject::Ptr object;
- BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("Endpoint")->GetObjects()) {
- Endpoint::Ptr endpoint = dynamic_pointer_cast<Endpoint>(object);
+ {
+ DynamicType::Ptr dt = DynamicType::GetByName("Endpoint");
+ ObjectLock dlock(dt);
+
+ DynamicObject::Ptr object;
+ BOOST_FOREACH(tie(tuples::ignore, object), dt->GetObjects()) {
+ Endpoint::Ptr endpoint = dynamic_pointer_cast<Endpoint>(object);
- histogram[endpoint] = 0;
+ histogram[endpoint] = 0;
+ }
}
vector<Service::Ptr> services;
- /* build "checker -> service count" histogram */
- BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("Service")->GetObjects()) {
- Service::Ptr service = dynamic_pointer_cast<Service>(object);
+ {
+ /* build "checker -> service count" histogram */
+ DynamicType::Ptr dt = DynamicType::GetByName("Service");
+ ObjectLock dlock(dt);
- if (!service)
- continue;
+ DynamicObject::Ptr object;
+ BOOST_FOREACH(tie(tuples::ignore, object), dt->GetObjects()) {
+ Service::Ptr service = dynamic_pointer_cast<Service>(object);
- services.push_back(service);
+ if (!service)
+ continue;
- String checker = service->GetChecker();
- if (checker.IsEmpty())
- continue;
+ services.push_back(service);
- if (!Endpoint::Exists(checker))
- continue;
+ ObjectLock olock(service);
+ String checker = service->GetChecker();
+ if (checker.IsEmpty())
+ continue;
+
+ if (!Endpoint::Exists(checker))
+ continue;
- Endpoint::Ptr endpoint = Endpoint::GetByName(checker);
+ Endpoint::Ptr endpoint = Endpoint::GetByName(checker);
- histogram[endpoint]++;
+ histogram[endpoint]++;
+ }
}
- std::random_shuffle(services.begin(), services.end());
+ //std::random_shuffle(services.begin(), services.end());
int delegated = 0;
/* re-assign services */
BOOST_FOREACH(const Service::Ptr& service, services) {
+ ObjectLock olock(service);
+
String checker = service->GetChecker();
Endpoint::Ptr oldEndpoint;
if (Endpoint::Exists(checker))
oldEndpoint = Endpoint::GetByName(checker);
- vector<Endpoint::Ptr> candidates = GetCheckerCandidates(service);
+ set<Endpoint::Ptr> candidates = GetCheckerCandidates(service);
int avg_services = 0, overflow_tolerance = 0;
vector<Endpoint::Ptr>::iterator cit;
if (candidates.size() > 0) {
- std::random_shuffle(candidates.begin(), candidates.end());
+ //std::random_shuffle(candidates.begin(), candidates.end());
stringstream msgbuf;
msgbuf << "Service: " << service->GetName() << ", candidates: " << candidates.size();
/* don't re-assign service if the checker is still valid
* and doesn't have too many services */
+
+ ObjectLock elock(oldEndpoint);
+
if (oldEndpoint && oldEndpoint->IsConnected() &&
- find(candidates.begin(), candidates.end(), oldEndpoint) != candidates.end() &&
+ candidates.find(oldEndpoint) != candidates.end() &&
histogram[oldEndpoint] <= avg_services + overflow_tolerance)
continue;
if (histogram[candidate] > avg_services)
continue;
+ ObjectLock clock(candidate);
service->SetChecker(candidate->GetName());
histogram[candidate]++;
void DelegationTimerHandler(void);
- vector<Endpoint::Ptr> GetCheckerCandidates(const Service::Ptr& service) const;
+ set<Endpoint::Ptr> GetCheckerCandidates(const Service::Ptr& service) const;
static bool IsEndpointChecker(const Endpoint::Ptr& endpoint);
*/
void DemoComponent::DemoTimerHandler(void)
{
- recursive_mutex::scoped_lock lock(Application::GetMutex());
-
Logger::Write(LogInformation, "demo", "Sending multicast 'hello"
" world' message.");
RequestMessage request;
request.SetMethod("demo::HelloWorld");
- EndpointManager::GetInstance()->SendMulticastMessage(m_Endpoint,
- request);
+ EndpointManager::Ptr em = EndpointManager::GetInstance();
+
+ ObjectLock olock(em);
+ em->SendMulticastMessage(m_Endpoint, request);
}
/**
void NotificationComponent::Start(void)
{
m_Endpoint = Endpoint::MakeEndpoint("notification", false);
+
+ ObjectLock olock(m_Endpoint);
m_Endpoint->RegisterTopicHandler("icinga::SendNotifications",
boost::bind(&NotificationComponent::SendNotificationsRequestHandler, this, _2,
_3));
*/
void NotificationComponent::NotificationTimerHandler(void)
{
- recursive_mutex::scoped_lock lock(Application::GetMutex());
-
// TODO: implement
}
return;
Service::Ptr service = Service::GetByName(svc);
+
+ ObjectLock olock(service);
service->SendNotifications(static_cast<NotificationType>(type));
}
static void ReloadConfigTimerHandler(void)
{
if (g_ReloadConfig) {
- {
- recursive_mutex::scoped_lock lock(Application::GetMutex());
-
- Logger::Write(LogInformation, "icinga-app", "Received SIGHUP. Reloading config files.");
- LoadConfigFiles(false);
- }
+ Logger::Write(LogInformation, "icinga-app", "Received SIGHUP. Reloading config files.");
+ LoadConfigFiles(false);
g_ReloadConfig = false;
}
using namespace icinga;
-recursive_mutex Application::m_Mutex;
Application *Application::m_Instance = NULL;
bool Application::m_ShuttingDown = false;
bool Application::m_Debugging = false;
m_ArgV = argv;
}
-void Application::NewTxTimerHandler(void)
-{
- DynamicObject::NewTx();
-}
-
#ifdef _DEBUG
void Application::ProfileTimerHandler(void)
{
thread t(&Application::TimeWatchThreadProc);
t.detach();
- /* Set up a timer to periodically flush the tx. */
- Timer::Ptr newTxTimer = boost::make_shared<Timer>();
- newTxTimer->OnTimerExpired.connect(boost::bind(&Application::NewTxTimerHandler));
- newTxTimer->SetInterval(0.5);
- newTxTimer->Start();
-
/* Set up a timer that watches the m_Shutdown flag. */
Timer::Ptr shutdownTimer = boost::make_shared<Timer>();
shutdownTimer->OnTimerExpired.connect(boost::bind(&Application::ShutdownTimerHandler));
flushTxTimer->Start();
#endif /* _DEBUG */
- GetEQ().Run();
+ GetEQ().Join();
}
/**
m_PkgDataDir = path;
}
-/**
- * Returns the global mutex.
- *
- * @returns The mutex.
- */
-recursive_mutex& Application::GetMutex(void)
-{
- return m_Mutex;
-}
-
/**
* Returns the main thread's event queue.
*
static String GetPkgDataDir(void);
static void SetPkgDataDir(const String& path);
- static recursive_mutex& GetMutex(void);
-
static EventQueue& GetEQ(void);
protected:
void RunEventLoop(void) const;
private:
- static recursive_mutex m_Mutex; /**< The global mutex. */
static Application *m_Instance; /**< The application instance. */
static bool m_ShuttingDown; /**< Whether the application is in the process of
void Start(const CompletionCallback& completionCallback = CompletionCallback())
{
m_CompletionCallback = completionCallback;
-
- try {
- Run();
- } catch (...) {
- FinishException(boost::current_exception());
- }
+ Utility::QueueAsyncCallback(boost::bind(&AsyncTask<TClass, TResult>::Run, this));
}
/**
}
-#endif /* CONNECTION_H */
\ No newline at end of file
+#endif /* CONNECTION_H */
*
* @param key The key whose value should be retrieved.
* @returns The value of an empty value if the key was not found.
+ * @threadsafety Always.
*/
Value Dictionary::Get(const char *key) const
{
+ ObjectLock olock(this);
+
map<String, Value>::const_iterator it;
it = std::lower_bound(m_Data.begin(), m_Data.end(), key, DictionaryKeyLessComparer());
*
* @param key The key whose value should be retrieved.
* @returns The value or an empty value if the key was not found.
+ * @threadsafety Always.
*/
Value Dictionary::Get(const String& key) const
{
*
* @param key The key.
* @param value The value.
+ * @threadsafety Always.
*/
void Dictionary::Set(const String& key, const Value& value)
{
+ ObjectLock olock(this);
+
if (value.IsEmpty()) {
Remove(key);
return;
*
* @param value The value.
* @returns The key that was used to add the new item.
+ * @threadsafety Always.
*/
String Dictionary::Add(const Value& value)
{
+ ObjectLock olock(this);
+
Dictionary::Iterator it;
String key;
long index = GetLength();
* Returns the number of elements in the dictionary.
*
* @returns Number of elements.
+ * @threadsafety Always.
*/
size_t Dictionary::GetLength(void) const
{
+ ObjectLock olock(this);
+
return m_Data.size();
}
*
* @param key The key.
* @returns true if the dictionary contains the key, false otherwise.
+ * @threadsafety Always.
*/
bool Dictionary::Contains(const String& key) const
{
+ ObjectLock olock(this);
+
return (m_Data.find(key) != m_Data.end());
}
* Removes the specified key from the dictionary.
*
* @param key The key.
+ * @threadsafety Always.
*/
void Dictionary::Remove(const String& key)
{
+ ObjectLock olock(this);
+
Dictionary::Iterator it;
it = m_Data.find(key);
* Makes a shallow copy of a dictionary.
*
* @returns a copy of the dictionary.
+ * @threadsafety Always.
*/
Dictionary::Ptr Dictionary::ShallowClone(void) const
{
+ ObjectLock olock(this);
+
Dictionary::Ptr clone = boost::make_shared<Dictionary>();
String key;
*
* @param json The JSON object.
* @returns A dictionary that is equivalent to the JSON object.
+ * @threadsafety Always.
*/
Dictionary::Ptr Dictionary::FromJson(cJSON *json)
{
*
* @returns A JSON object that is equivalent to the dictionary. Values that
* cannot be represented in JSON are omitted.
+ * @threadsafety Always.
*/
cJSON *Dictionary::ToJson(void) const
{
cJSON *json = cJSON_CreateObject();
try {
+ ObjectLock olock(this);
+
String key;
Value value;
BOOST_FOREACH(tie(key, value), m_Data) {
double DynamicObject::m_CurrentTx = 0;
set<DynamicObject *> DynamicObject::m_ModifiedObjects;
-boost::mutex DynamicObject::m_ModifiedObjectsMutex;
+boost::mutex DynamicObject::m_TransactionMutex;
+boost::once_flag DynamicObject::m_TransactionOnce;
+Timer::Ptr DynamicObject::m_TransactionTimer;
signals2::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnRegistered;
signals2::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnUnregistered;
* The DynamicObject::Create function takes care of restoring
* non-config state after the object has been fully constructed */
ApplyUpdate(serializedObject, Attribute_Config);
+
+ boost::call_once(m_TransactionOnce, &DynamicObject::Initialize);
}
/*
*/
DynamicObject::~DynamicObject(void)
{
- boost::mutex::scoped_lock lock(m_ModifiedObjectsMutex);
+ boost::mutex::scoped_lock lock(m_TransactionMutex);
m_ModifiedObjects.erase(this);
}
+void DynamicObject::Initialize(void)
+{
+ /* Set up a timer to periodically create a new transaction. */
+ m_TransactionTimer = boost::make_shared<Timer>();
+ m_TransactionTimer->SetInterval(0.5);
+ m_TransactionTimer->OnTimerExpired.connect(boost::bind(&DynamicObject::NewTx));
+ m_TransactionTimer->Start();
+}
+
+/**
+ * @threadsafety Always.
+ */
void DynamicObject::SendLocalUpdateEvents(void)
{
map<String, Value, string_iless>::iterator it;
m_ConfigTx = tx;
{
- boost::mutex::scoped_lock lock(m_ModifiedObjectsMutex);
+ boost::mutex::scoped_lock lock(m_TransactionMutex);
m_ModifiedObjects.insert(this);
}
void DynamicObject::Register(void)
{
- recursive_mutex::scoped_lock lock(Application::GetMutex());
-
DynamicType::Ptr dtype = GetType();
DynamicObject::Ptr dobj = dtype->GetObject(GetName());
void DynamicObject::Unregister(void)
{
- recursive_mutex::scoped_lock lock(Application::GetMutex());
-
DynamicType::Ptr dtype = GetType();
+ ObjectLock olock(dtype);
if (!dtype || !dtype->GetObject(GetName()))
return;
if (!value.IsObjectType<Dictionary>())
return ScriptTask::Ptr();
+ String funcName;
Dictionary::Ptr methods = value;
- if (!methods->Contains(method))
- return ScriptTask::Ptr();
- String funcName = methods->Get(method);
+ {
+ ObjectLock olock(methods);
+ if (!methods->Contains(method))
+ return ScriptTask::Ptr();
+
+ funcName = methods->Get(method);
+ }
ScriptFunction::Ptr func = ScriptFunction::GetByName(funcName);
*/
void DynamicObject::DumpObjects(const String& filename)
{
- recursive_mutex::scoped_lock lock(Application::GetMutex());
-
Logger::Write(LogInformation, "base", "Dumping program state to file '" + filename + "'");
String tempFilename = filename + ".tmp";
*/
void DynamicObject::RestoreObjects(const String& filename)
{
- recursive_mutex::scoped_lock lock(Application::GetMutex());
-
Logger::Write(LogInformation, "base", "Restoring program state from file '" + filename + "'");
std::fstream fp;
bool hasConfig = update->Contains("configTx");
DynamicType::Ptr dt = DynamicType::GetByName(type);
+ ObjectLock dlock(dt);
if (!dt)
BOOST_THROW_EXCEPTION(invalid_argument("Invalid type: " + type));
Logger::Write(LogDebug, "base", msgbuf.str());
}
-/*
- * @threadsafety Always.
- */
void DynamicObject::DeactivateObjects(void)
{
- recursive_mutex::scoped_lock lock(Application::GetMutex());
-
DynamicType::TypeMap::iterator tt;
for (tt = DynamicType::GetTypes().begin(); tt != DynamicType::GetTypes().end(); tt++) {
DynamicType::NameMap::iterator nt;
*/
double DynamicObject::GetCurrentTx(void)
{
- recursive_mutex::scoped_lock lock(Application::GetMutex());
+ boost::mutex::scoped_lock lock(m_TransactionMutex);
assert(m_CurrentTx != 0);
}
/*
- * @threadsafety Always.
+ * @threadsafety Always. Caller must not hold any Object locks.
*/
void DynamicObject::NewTx(void)
{
+ double tx;
set<DynamicObject *> objects;
{
- boost::mutex::scoped_lock lock(m_ModifiedObjectsMutex);
+ boost::mutex::scoped_lock lock(m_TransactionMutex);
- /* Some objects may accidentally bleed into the next transaction because
- * we're not holding the global mutex while "stealing" the modified objects,
- * but that's entirely ok. */
+ tx = m_CurrentTx;
m_ModifiedObjects.swap(objects);
+ m_CurrentTx = Utility::GetTime();
}
- recursive_mutex::scoped_lock lock(Application::GetMutex());
-
BOOST_FOREACH(DynamicObject *object, objects) {
+ ObjectLock olock(object);
object->SendLocalUpdateEvents();
}
- OnTransactionClosing(m_CurrentTx, objects);
- m_CurrentTx = Utility::GetTime();
+ OnTransactionClosing(tx, objects);
}
void DynamicObject::OnInitCompleted(void)
*/
DynamicObject::Ptr DynamicObject::GetObject(const String& type, const String& name)
{
- recursive_mutex::scoped_lock lock(Application::GetMutex());
-
DynamicType::Ptr dtype = DynamicType::GetByName(type);
- return dtype->GetObject(name);
+
+ {
+ ObjectLock olock(dtype);
+ return dtype->GetObject(name);
+ }
}
const DynamicObject::AttributeMap& DynamicObject::GetAttributes(void) const
DynamicObject(const Dictionary::Ptr& serializedObject);
~DynamicObject(void);
+ static void Initialize(void);
+
Dictionary::Ptr BuildUpdate(double sinceTx, int attributeTypes) const;
void ApplyUpdate(const Dictionary::Ptr& serializedUpdate, int allowedTypes);
/* This has to be a set of raw pointers because the DynamicObject
* constructor has to be able to insert objects into this list. */
static set<DynamicObject *> m_ModifiedObjects;
- static boost::mutex m_ModifiedObjectsMutex;
+ static boost::mutex m_TransactionMutex;
+ static boost::once_flag m_TransactionOnce;
+ static Timer::Ptr m_TransactionTimer;
friend class DynamicType; /* for OnInitCompleted. */
};
using namespace icinga;
-boost::mutex DynamicType::m_Mutex;
-
DynamicType::DynamicType(const String& name, const DynamicType::ObjectFactory& factory)
: m_Name(name), m_ObjectFactory(factory)
{ }
*/
DynamicType::Ptr DynamicType::GetByName(const String& name)
{
- boost::mutex::scoped_lock lock(m_Mutex);
+ boost::mutex::scoped_lock lock(GetStaticMutex());
DynamicType::TypeMap::const_iterator tt = GetTypes().find(name);
}
/**
- * @threadsafety Caller must hold DynamicType::m_Mutex while using the map.
+ * @threadsafety Caller must hold DynamicType::GetStaticMutex() while using the map.
*/
DynamicType::TypeMap& DynamicType::GetTypes(void)
{
}
/**
- * @threadsafety Caller must hold DynamicType::m_Mutex while using the map.
+ * @threadsafety Caller must hold DynamicType::GetStaticMutex() while using the map.
*/
DynamicType::NameMap& DynamicType::GetObjects(void)
{
*/
void DynamicType::RegisterType(const DynamicType::Ptr& type)
{
- boost::mutex::scoped_lock lock(m_Mutex);
+ boost::mutex::scoped_lock lock(GetStaticMutex());
DynamicType::TypeMap::const_iterator tt = GetTypes().find(type->GetName());
for (int i = 0; i < attributeCount; i++)
AddAttribute(attributes[i].Name, attributes[i].Type);
}
+
+boost::mutex& DynamicType::GetStaticMutex(void)
+{
+ static boost::mutex mutex;
+ return mutex;
+}
void AddAttributes(const AttributeDescription *attributes, int attributeCount);
private:
- static boost::mutex m_Mutex;
String m_Name;
ObjectFactory m_ObjectFactory;
map<String, DynamicAttributeType> m_Attributes;
NameMap m_Objects;
+
+ static boost::mutex& GetStaticMutex(void);
};
/**
*/
EventQueue::EventQueue(void)
: m_Stopped(false)
-{ }
+{
+ int cpus = thread::hardware_concurrency();
+
+ if (cpus < 4)
+ cpus = 4;
+
+ for (int i = 0; i < cpus; i++)
+ m_Threads.create_thread(boost::bind(&EventQueue::QueueThreadProc, this));
+}
/**
* @threadsafety Always.
EventQueue::~EventQueue(void)
{
Stop();
+ Join();
}
/**
}
/**
- * Spawns worker threads and waits for them to complete.
+ * Waits for all worker threads to finish.
*
* @threadsafety Always.
*/
-void EventQueue::Run(void)
+void EventQueue::Join(void)
{
- thread_group threads;
-
- int cpus = thread::hardware_concurrency();
-
- if (cpus == 0)
- cpus = 4;
-
- for (int i = 0; i < cpus * 4; i++)
- threads.create_thread(boost::bind(&EventQueue::QueueThreadProc, this));
-
- threads.join_all();
+ m_Threads.join_all();
}
/**
EventQueue(void);
~EventQueue(void);
- void Run(void);
- void Post(const Callback& callback);
-
void Stop(void);
+ void Join(void);
+
+ void Post(const Callback& callback);
private:
- boost::thread::id m_Owner;
+ thread_group m_Threads;
boost::mutex m_Mutex;
condition_variable m_CV;
entry.Facility = facility;
entry.Message = message;
- {
- recursive_mutex::scoped_lock lock(Application::GetMutex());
- ForwardLogEntry(entry);
- }
+ ForwardLogEntry(entry);
}
/**
DynamicType::Ptr dt = DynamicType::GetByName("Logger");
DynamicObject::Ptr object;
- BOOST_FOREACH(tie(tuples::ignore, object), dt->GetObjects()) {
- Logger::Ptr logger = dynamic_pointer_cast<Logger>(object);
- if (entry.Severity >= logger->GetMinSeverity())
- logger->m_Impl->ProcessLogEntry(entry);
+ {
+ ObjectLock olock(dt);
+ BOOST_FOREACH(tie(tuples::ignore, object), dt->GetObjects()) {
+ Logger::Ptr logger = dynamic_pointer_cast<Logger>(object);
+
+ {
+ ObjectLock llock(logger);
+
+ if (entry.Severity >= logger->GetMinSeverity())
+ logger->m_Impl->ProcessLogEntry(entry);
+ }
- processed = true;
+ processed = true;
+ }
}
LogSeverity defaultLogLevel;
* Returns a reference-counted pointer to this object.
*
* @returns A shared_ptr object that points to this object
+ * @threadsafety Always.
*/
Object::SharedPtrHolder Object::GetSelf(void)
{
+ ObjectLock olock(this);
return Object::SharedPtrHolder(shared_from_this());
}
* @returns The object's mutex.
* @threadsafety Always.
*/
-recursive_mutex& Object::GetMutex(void)
+recursive_mutex& Object::GetMutex(void) const
{
return m_Mutex;
}
SharedPtrHolder GetSelf(void);
- recursive_mutex& GetMutex(void);
+ recursive_mutex& GetMutex(void) const;
protected:
Object(void);
Object(const Object& other);
Object& operator=(const Object& rhs);
- recursive_mutex m_Mutex;
+ mutable recursive_mutex m_Mutex;
};
/**
struct ObjectLock {
public:
ObjectLock(const Object::Ptr& object)
- : m_Lock(object->GetMutex())
- { }
+#ifdef _DEBUG
+ : m_Lock(), m_Object(object)
+#endif /* _DEBUG */
+ {
+ if (object)
+ m_Lock = recursive_mutex::scoped_lock(object->GetMutex());
+ }
- ObjectLock(Object *object)
- : m_Lock(object->GetMutex())
- { }
+ ObjectLock(const Object *object)
+#ifdef _DEBUG
+ : m_Lock(), m_Object(object->GetSelf())
+#endif /* _DEBUG */
+ {
+ if (object)
+ m_Lock = recursive_mutex::scoped_lock(object->GetMutex());
+ }
+
+#ifdef _DEBUG
+ ~ObjectLock(void)
+ {
+ assert(m_Object.lock());
+ }
+#endif /* _DEBUG */
private:
recursive_mutex::scoped_lock m_Lock;
+#ifdef _DEBUG
+ Object::WeakPtr m_Object;
+#endif /* _DEBUG */
};
/**
}
-#endif /* STDIOSTREAM_H */
\ No newline at end of file
+#endif /* STDIOSTREAM_H */
Logger::Write(LogWarning, "base", msgbuf.str());
}
+ /* Re-enable the timer so it can be called again. */
+ m_Started = true;
Reschedule();
}
{
boost::call_once(&Timer::Initialize, m_ThreadOnce);
+ m_Started = true;
+
Reschedule();
}
void Timer::Stop(void)
{
boost::mutex::scoped_lock lock(m_Mutex);
+
+ m_Started = false;
m_Timers.erase(GetSelf());
/* Notify the worker thread that we've disabled a timer. */
m_Next = next;
- /* Remove and re-add the timer to update the index. */
- m_Timers.erase(GetSelf());
- m_Timers.insert(GetSelf());
+ if (m_Started) {
+ /* Remove and re-add the timer to update the index. */
+ m_Timers.erase(GetSelf());
+ m_Timers.insert(GetSelf());
- /* Notify the worker that we've rescheduled a timer. */
- m_CV.notify_all();
+ /* Notify the worker that we've rescheduled a timer. */
+ m_CV.notify_all();
+ }
}
/**
/* Remove the timer from the list so it doesn't get called again
* until the current call is completed. */
+ timer->m_Started = false;
m_Timers.erase(timer);
/* Asynchronously call the timer. */
private:
double m_Interval; /**< The interval of the timer. */
double m_Next; /**< When the next event should happen. */
+ bool m_Started; /**< Whether the timer is enabled. */
typedef multi_index_container<
Timer::WeakPtr,
ioctlsocket(s, FIONBIO, &lTrue);
#endif /* _WIN32 */
}
+
+void Utility::QueueAsyncCallback(const boost::function<void (void)>& callback)
+{
+ Application::GetEQ().Post(callback);
+}
static bool Glob(const String& pathSpec, const function<void (const String&)>& callback);
+ static void QueueAsyncCallback(const boost::function<void (void)>& callback);
+
static
#ifdef _WIN32
HMODULE
void ConfigItem::InternalLink(const Dictionary::Ptr& dictionary) const
{
BOOST_FOREACH(const String& name, m_Parents) {
- ConfigItem::Ptr parent = ConfigItem::GetObject(GetType(), name);
+ ConfigItem::Ptr parent;
+
+ ConfigCompilerContext *context = ConfigCompilerContext::GetContext();
+
+ if (context)
+ parent = context->GetItem(GetType(), name);
+
+ /* ignore already active objects while we're in the compiler
+ * context and linking to existing items is disabled. */
+ if (!parent && (!context || (context->GetFlags() & CompilerLinkExisting)))
+ parent = ConfigItem::GetObject(GetType(), name);
if (!parent) {
stringstream message;
if (it != m_Items.end()) {
/* Unregister the old item from its parents. */
ConfigItem::Ptr oldItem = it->second;
+ ObjectLock olock(oldItem);
oldItem->UnregisterFromParents();
/* Steal the old item's children. */
/* Register this item with its parents. */
BOOST_FOREACH(const String& parentName, m_Parents) {
ConfigItem::Ptr parent = GetObject(GetType(), parentName);
+ ObjectLock olock(parent);
parent->RegisterChild(GetSelf());
}
/* Update or create the object and apply the configuration settings. */
DynamicObject::Ptr dobj = m_DynamicObject.lock();
- if (!dobj)
+ if (!dobj) {
+ ObjectLock dlock(dtype);
dobj = dtype->GetObject(GetName());
+ }
+
+ bool was_null = false;
- if (!dobj)
+ if (!dobj) {
+ ObjectLock dlock(dtype);
dobj = dtype->CreateObject(update);
- else
- dobj->ApplyUpdate(update, Attribute_Config);
+ was_null = true;
+ }
- m_DynamicObject = dobj;
+ {
+ ObjectLock olock(dobj);
- if (dobj->IsAbstract())
- dobj->Unregister();
- else
- dobj->Register();
+ if (!was_null)
+ dobj->ApplyUpdate(update, Attribute_Config);
+
+ m_DynamicObject = dobj;
+
+ if (dobj->IsAbstract())
+ dobj->Unregister();
+ else
+ dobj->Register();
+ }
/* We need to make a copy of the child objects because the
* OnParentCommitted() handler is going to update the list. */
if (!child)
continue;
+ ObjectLock olock(child);
child->OnParentCommitted();
}
{
DynamicObject::Ptr dobj = m_DynamicObject.lock();
- if (dobj)
+ if (dobj) {
+ ObjectLock olock(dobj);
dobj->Unregister();
+ }
ConfigItem::ItemMap::iterator it;
it = m_Items.find(make_pair(GetType(), GetName()));
BOOST_FOREACH(const String& parentName, m_Parents) {
ConfigItem::Ptr parent = GetObject(GetType(), parentName);
- if (parent)
+ if (parent) {
+ ObjectLock olock(parent);
parent->UnregisterChild(GetSelf());
+ }
}
}
*/
ConfigItem::Ptr ConfigItem::GetObject(const String& type, const String& name)
{
- {
- recursive_mutex::scoped_lock lockg(Application::GetMutex());
-
- ConfigCompilerContext *context = ConfigCompilerContext::GetContext();
-
- if (context) {
- ConfigItem::Ptr item = context->GetItem(type, name);
-
- if (item)
- return item;
-
- /* ignore already active objects while we're in the compiler
- * context and linking to existing items is disabled. */
- if ((context->GetFlags() & CompilerLinkExisting) == 0)
- return ConfigItem::Ptr();
- }
- }
-
{
boost::mutex::scoped_lock lock(m_Mutex);
}
/**
- * @threadsafety Caller must hold the global mutex.
+ * @threadsafety Always.
*/
void ConfigItem::UnloadUnit(const String& unit)
{
vector<ConfigItem::Ptr> obsoleteItems;
- ConfigItem::Ptr item;
- BOOST_FOREACH(tie(tuples::ignore, item), m_Items) {
- if (item->GetUnit() != unit)
- continue;
+ {
+ boost::mutex::scoped_lock lock(m_Mutex);
- obsoleteItems.push_back(item);
+ ConfigItem::Ptr item;
+ BOOST_FOREACH(tie(tuples::ignore, item), m_Items) {
+ ObjectLock olock(item);
+
+ if (item->GetUnit() != unit)
+ continue;
+
+ obsoleteItems.push_back(item);
+ }
}
- BOOST_FOREACH(item, obsoleteItems) {
+ BOOST_FOREACH(const ConfigItem::Ptr& item, obsoleteItems) {
+ ObjectLock olock(item);
item->Unregister();
}
}
}
BOOST_FOREACH(const String& parent, m_Parents) {
- ConfigItem::Ptr item = ConfigItem::GetObject(m_Type, parent);
+ ConfigItem::Ptr item;
+
+ ConfigCompilerContext *context = ConfigCompilerContext::GetContext();
+
+ if (context)
+ item = context->GetItem(m_Type, parent);
+
+ /* ignore already active objects while we're in the compiler
+ * context and linking to existing items is disabled. */
+ if (!item && (!context || (context->GetFlags() & CompilerLinkExisting)))
+ item = ConfigItem::GetObject(m_Type, parent);
if (!item) {
stringstream msgbuf;
callback = it->second;
}
- {
- recursive_mutex::scoped_lock lock(Application::GetMutex());
- callback(time, arguments);
- }
-
+ callback(time, arguments);
}
/**
set<Service::Ptr> services;
BOOST_FOREACH(const Service::Ptr& service, sg->GetMembers()) {
- Service::Ptr hcService = service->GetHost()->GetHostCheckService();
+ Host::Ptr host = service->GetHost();
+ Service::Ptr hcService = host->GetHostCheckService();
if (hcService)
services.insert(hcService);
}
Logger::Write(LogInformation, "icinga", "Creating comment for host " + host->GetName());
Service::Ptr service = host->GetHostCheckService();
- if (service) {
+ if (service)
(void) service->AddComment(CommentUser, arguments[2], arguments[3], 0);
- }
}
void ExternalCommandProcessor::DelHostComment(double, const vector<String>& arguments)
Logger::Write(LogInformation, "icinga", "Removing all comments for host " + host->GetName());
Service::Ptr service = host->GetHostCheckService();
- if (service) {
+ if (service)
service->RemoveAllComments();
- }
}
void ExternalCommandProcessor::DelAllSvcComments(double, const vector<String>& arguments)
continue;
}
- if (!ConfigItem::GetObject("Service", name)) {
+ ConfigItem::Ptr item;
+
+ ConfigCompilerContext *context = ConfigCompilerContext::GetContext();
+
+ if (context)
+ item = context->GetItem("Service", name);
+
+ /* ignore already active objects while we're in the compiler
+ * context and linking to existing items is disabled. */
+ if (!item && (!context || (context->GetFlags() & CompilerLinkExisting)))
+ item = ConfigItem::GetObject("Service", name);
+
+ if (!item) {
ConfigCompilerContext::GetContext()->AddError(false, "Validation failed for " +
location + ": Service '" + name + "' not found.");
}
: m_Task(task), m_Process(process)
{ }
-/**
- * @threadsafety Always.
- */
void PluginCheckTask::ScriptFunc(const ScriptTask::Ptr& task, const vector<Value>& arguments)
{
- recursive_mutex::scoped_lock lock(Application::GetMutex());
-
if (arguments.size() < 1)
BOOST_THROW_EXCEPTION(invalid_argument("Missing argument: Service must be specified."));
if (!vservice.IsObjectType<Service>())
BOOST_THROW_EXCEPTION(invalid_argument("Argument must be a service."));
- Service::Ptr service = vservice;
-
vector<Dictionary::Ptr> macroDicts;
- macroDicts.push_back(service->GetMacros());
- macroDicts.push_back(service->CalculateDynamicMacros());
- macroDicts.push_back(service->GetHost()->GetMacros());
- macroDicts.push_back(service->GetHost()->CalculateDynamicMacros());
- macroDicts.push_back(IcingaApplication::GetInstance()->GetMacros());
+ Value raw_command;
+ Host::Ptr host;
+
+ {
+ Service::Ptr service = vservice;
+ ObjectLock olock(service);
+ macroDicts.push_back(service->GetMacros());
+ macroDicts.push_back(service->CalculateDynamicMacros());
+ raw_command = service->GetCheckCommand();
+ host = service->GetHost();
+ }
+
+ {
+ ObjectLock olock(host);
+ macroDicts.push_back(host->GetMacros());
+ macroDicts.push_back(host->CalculateDynamicMacros());
+ }
+
+ {
+ IcingaApplication::Ptr app = IcingaApplication::GetInstance();
+ ObjectLock olock(app);
+ macroDicts.push_back(app->GetMacros());
+ }
+
Dictionary::Ptr macros = MacroProcessor::MergeMacroDicts(macroDicts);
- Value command = MacroProcessor::ResolveMacros(service->GetCheckCommand(), macros);
+ Value command = MacroProcessor::ResolveMacros(raw_command, macros);
Process::Ptr process = boost::make_shared<Process>(Process::SplitCommand(command), macros);
process->Start(boost::bind(&PluginCheckTask::ProcessFinishedHandler, ct));
}
-/**
- * @threadsafety Always.
- */
void PluginCheckTask::ProcessFinishedHandler(PluginCheckTask ct)
{
- recursive_mutex::scoped_lock lock(Application::GetMutex());
-
ProcessResult pr;
try {
*/
void PluginNotificationTask::ScriptFunc(const ScriptTask::Ptr& task, const vector<Value>& arguments)
{
- recursive_mutex::scoped_lock lock(Application::GetMutex());
-
if (arguments.size() < 1)
BOOST_THROW_EXCEPTION(invalid_argument("Missing argument: Notification target must be specified."));
if (!arguments[0].IsObjectType<Notification>())
BOOST_THROW_EXCEPTION(invalid_argument("Argument must be a service."));
- Notification::Ptr notification = arguments[0];
NotificationType type = static_cast<NotificationType>(static_cast<int>(arguments[1]));
vector<Dictionary::Ptr> macroDicts;
- macroDicts.push_back(notification->GetMacros());
- macroDicts.push_back(notification->GetService()->GetMacros());
- macroDicts.push_back(notification->GetService()->CalculateDynamicMacros());
- macroDicts.push_back(notification->GetService()->GetHost()->GetMacros());
- macroDicts.push_back(notification->GetService()->GetHost()->CalculateDynamicMacros());
- macroDicts.push_back(IcingaApplication::GetInstance()->GetMacros());
+ Value raw_command;
+ Service::Ptr service;
+ Host::Ptr host;
+ String service_name;
+
+ {
+ Notification::Ptr notification = arguments[0];
+ ObjectLock olock(notification);
+ macroDicts.push_back(notification->GetMacros());
+ raw_command = notification->GetNotificationCommand();
+ service = notification->GetService();
+ }
+
+ {
+ ObjectLock olock(service);
+ macroDicts.push_back(service->GetMacros());
+ macroDicts.push_back(service->CalculateDynamicMacros());
+ service_name = service->GetName();
+ host = service->GetHost();
+ }
+
+ {
+ ObjectLock olock(host);
+ macroDicts.push_back(host->GetMacros());
+ macroDicts.push_back(host->CalculateDynamicMacros());
+ }
+
+ {
+ IcingaApplication::Ptr app = IcingaApplication::GetInstance();
+ ObjectLock olock(app);
+ macroDicts.push_back(app->GetMacros());
+ }
+
Dictionary::Ptr macros = MacroProcessor::MergeMacroDicts(macroDicts);
- Value command = MacroProcessor::ResolveMacros(notification->GetNotificationCommand(), macros);
+ Value command = MacroProcessor::ResolveMacros(raw_command, macros);
Process::Ptr process = boost::make_shared<Process>(Process::SplitCommand(command), macros);
- PluginNotificationTask ct(task, process, notification->GetService()->GetName(), command);
+ PluginNotificationTask ct(task, process, service_name, command);
process->Start(boost::bind(&PluginNotificationTask::ProcessFinishedHandler, ct));
}
*/
void PluginNotificationTask::ProcessFinishedHandler(PluginNotificationTask ct)
{
- recursive_mutex::scoped_lock lock(Application::GetMutex());
-
ProcessResult pr;
try {
rm.SetParams(params);
- EndpointManager::GetInstance()->SendMulticastMessage(rm);
+ EndpointManager::Ptr em = EndpointManager::GetInstance();
+ ObjectLock olock(em);
+ em->SendMulticastMessage(rm);
}
void Service::UpdateStatistics(const Dictionary::Ptr& cr)
void Service::CommentsExpireTimerHandler(void)
{
- recursive_mutex::scoped_lock lock(Application::GetMutex());
+ DynamicType::Ptr dt = DynamicType::GetByName("Service");
+ ObjectLock dlock(dt);
DynamicObject::Ptr object;
- BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("Service")->GetObjects()) {
+ BOOST_FOREACH(tie(tuples::ignore, object), dt->GetObjects()) {
Service::Ptr service = dynamic_pointer_cast<Service>(object);
+ ObjectLock olock(service);
service->RemoveExpiredComments();
}
}
void Service::DowntimesExpireTimerHandler(void)
{
- recursive_mutex::scoped_lock lock(Application::GetMutex());
+ DynamicType::Ptr dt = DynamicType::GetByName("Service");
+ ObjectLock dlock(dt);
DynamicObject::Ptr object;
- BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("Service")->GetObjects()) {
+ BOOST_FOREACH(tie(tuples::ignore, object), dt->GetObjects()) {
Service::Ptr service = dynamic_pointer_cast<Service>(object);
+ ObjectLock slock(service);
service->RemoveExpiredDowntimes();
}
}
Service::Ptr Service::GetByNamePair(const String& hostName, const String& serviceName)
{
if (!hostName.IsEmpty()) {
- recursive_mutex::scoped_lock lock(Application::GetMutex());
-
Host::Ptr host = Host::GetByName(hostName);
+ ObjectLock olock(host);
return host->GetServiceByShortName(serviceName);
} else {
return Service::GetByName(serviceName);
return NULL;
}
- {
- recursive_mutex::scoped_lock lock(Application::GetMutex());
- interp->RegisterPythonFunction(name, object);
- }
-
+ interp->RegisterPythonFunction(name, object);
Py_INCREF(Py_None);
return Py_None;
if (it == m_TopicHandlers.end())
return;
- (*it->second)(GetSelf(), sender, request);
+ Application::GetEQ().Post(boost::bind(boost::ref(*it->second), GetSelf(), sender, request));
} else {
GetClient()->SendMessage(request);
}
{
return Get("service");
}
-
void EndpointManager::SubscriptionTimerHandler(void)
{
- recursive_mutex::scoped_lock lock(Application::GetMutex());
-
Dictionary::Ptr subscriptions = boost::make_shared<Dictionary>();
DynamicObject::Ptr object;
void EndpointManager::ReconnectTimerHandler(void)
{
- recursive_mutex::scoped_lock lock(Application::GetMutex());
-
DynamicObject::Ptr object;
BOOST_FOREACH(tie(tuples::ignore, object), DynamicType::GetByName("Endpoint")->GetObjects()) {
Endpoint::Ptr endpoint = dynamic_pointer_cast<Endpoint>(object);
void EndpointManager::RequestTimerHandler(void)
{
- recursive_mutex::scoped_lock lock(Application::GetMutex());
-
map<String, PendingRequest>::iterator it;
for (it = m_Requests.begin(); it != m_Requests.end(); it++) {
if (it->second.HasTimedOut()) {