Service::OnCheckerChanged.connect(bind(&CheckerComponent::CheckerChangedHandler, this, _1));
Service::OnNextCheckChanged.connect(bind(&CheckerComponent::NextCheckChangedHandler, this, _1));
- DynamicObject::OnUnregistered.connect(bind(&CheckerComponent::ObjectRemovedHandler, this, _1));
- boost::thread thread(boost::bind(&CheckerComponent::CheckThreadProc, this));
- thread.detach();
+ m_Stopped = false;
+
+ m_Thread = thread(boost::bind(&CheckerComponent::CheckThreadProc, this));
m_ResultTimer = boost::make_shared<Timer>();
m_ResultTimer->SetInterval(5);
void CheckerComponent::Stop(void)
{
m_Endpoint->Unregister();
+
+ {
+ boost::mutex::scoped_lock lock(m_Mutex);
+ m_Stopped = true;
+ m_CV.notify_all();
+ }
+
+ m_Thread.join();
}
void CheckerComponent::CheckThreadProc(void)
typedef nth_index<ServiceSet, 1>::type CheckTimeView;
CheckTimeView& idx = boost::get<1>(m_IdleServices);
- while (idx.begin() == idx.end())
+ while (idx.begin() == idx.end() && !m_Stopped)
m_CV.wait(lock);
+ if (m_Stopped)
+ break;
+
CheckTimeView::iterator it = idx.begin();
service = it->lock();
/* Wait for the next check. */
boost::mutex::scoped_lock lock(m_Mutex);
- m_CV.timed_wait(lock, boost::posix_time::milliseconds(wait * 1000));
+ if (!m_Stopped)
+ m_CV.timed_wait(lock, boost::posix_time::milliseconds(wait * 1000));
continue;
}
}
}
-void CheckerComponent::ObjectRemovedHandler(const DynamicObject::Ptr& object)
-{
- Service::Ptr service = dynamic_pointer_cast<Service>(object);
-
- /* ignore it if the removed object is not a service */
- if (!service)
- return;
-
- {
- boost::mutex::scoped_lock lock(m_Mutex);
-
- m_IdleServices.erase(service);
- m_PendingServices.erase(service);
- m_CV.notify_all();
- }
-}
boost::mutex m_Mutex;
boost::condition_variable m_CV;
+ bool m_Stopped;
+ thread m_Thread;
ServiceSet m_IdleServices;
ServiceSet m_PendingServices;
void CheckerChangedHandler(const Service::Ptr& service);
void NextCheckChangedHandler(const Service::Ptr& service);
- void ObjectRemovedHandler(const DynamicObject::Ptr& object);
void RescheduleCheckTimer(void);
};
void CompatComponent::DumpHostStatus(ofstream& fp, const Host::Ptr& host)
{
- ObjectLock olock(host);
+ Service::Ptr hc;
+
+ {
+ ObjectLock olock(host);
+ hc = host->GetHostCheckService();
+
+ fp << "hoststatus {" << "\n"
+ << "\t" << "host_name=" << host->GetName() << "\n";
+ }
+
+ ServiceState hcState = StateOK;
+
+ if (hc) {
+ ObjectLock olock(hc);
+ hcState = hc->GetState();
+ }
int state;
- if (!host->IsReachable())
+ if (!Host::IsReachable(host))
state = 2; /* unreachable */
- else if (!host->IsUp())
+ else if (hcState != StateOK)
state = 1; /* down */
else
state = 0; /* up */
- fp << "hoststatus {" << "\n"
- << "\t" << "host_name=" << host->GetName() << "\n";
-
- Service::Ptr hostcheck = host->GetHostCheckService();
-
- if (hostcheck) {
- DumpServiceStatusAttrs(fp, hostcheck, CompatTypeHost);
- }
+ if (hc)
+ DumpServiceStatusAttrs(fp, hc, CompatTypeHost);
fp << "\t" << "}" << "\n"
<< "\n";
- if (hostcheck) {
- DumpDowntimes(fp, hostcheck, CompatTypeHost);
- DumpComments(fp, hostcheck, CompatTypeHost);
+ if (hc) {
+ DumpDowntimes(fp, hc, CompatTypeHost);
+ DumpComments(fp, hc, CompatTypeHost);
}
}
void CompatComponent::DumpServiceStatusAttrs(ofstream& fp, const Service::Ptr& service, CompatObjectType type)
{
- ObjectLock olock(service);
-
String output;
String perfdata;
double schedule_start = -1, schedule_end = -1;
double execution_start = -1, execution_end = -1;
- Dictionary::Ptr cr = service->GetLastCheckResult();
+ Dictionary::Ptr cr;
+ int state;
+ Host::Ptr host;
+
+ {
+ ObjectLock olock(service);
+
+ cr = service->GetLastCheckResult();
+ state = service->GetState();
+ host = service->GetHost();
+ }
+
if (cr) {
output = cr->Get("output");
schedule_start = cr->Get("schedule_start");
double execution_time = (execution_end - execution_start);
double latency = (schedule_end - schedule_start) - execution_time;
- int state = service->GetState();
-
if (state > StateUnknown)
state = StateUnknown;
else
state = 1;
- if (!service->GetHost()->IsReachable())
+ if (Host::IsReachable(host))
state = 2;
}
- fp << "\t" << "check_interval=" << service->GetCheckInterval() / 60.0 << "\n"
- << "\t" << "retry_interval=" << service->GetRetryInterval() / 60.0 << "\n"
- << "\t" << "has_been_checked=" << (service->GetLastCheckResult() ? 1 : 0) << "\n"
- << "\t" << "should_be_scheduled=1" << "\n"
- << "\t" << "check_execution_time=" << execution_time << "\n"
- << "\t" << "check_latency=" << latency << "\n"
- << "\t" << "current_state=" << state << "\n"
- << "\t" << "state_type=" << service->GetStateType() << "\n"
- << "\t" << "plugin_output=" << output << "\n"
- << "\t" << "performance_data=" << perfdata << "\n"
- << "\t" << "last_check=" << schedule_end << "\n"
- << "\t" << "next_check=" << service->GetNextCheck() << "\n"
- << "\t" << "current_attempt=" << service->GetCurrentCheckAttempt() << "\n"
- << "\t" << "max_attempts=" << service->GetMaxCheckAttempts() << "\n"
- << "\t" << "last_state_change=" << service->GetLastStateChange() << "\n"
- << "\t" << "last_hard_state_change=" << service->GetLastHardStateChange() << "\n"
- << "\t" << "last_update=" << time(NULL) << "\n"
- << "\t" << "active_checks_enabled=" << (service->GetEnableActiveChecks() ? 1 : 0) <<"\n"
- << "\t" << "passive_checks_enabled=" << (service->GetEnablePassiveChecks() ? 1 : 0) << "\n"
- << "\t" << "problem_has_been_acknowledged=" << (service->GetAcknowledgement() != AcknowledgementNone ? 1 : 0) << "\n"
- << "\t" << "acknowledgement_type=" << static_cast<int>(service->GetAcknowledgement()) << "\n"
- << "\t" << "acknowledgement_end_time=" << service->GetAcknowledgementExpiry() << "\n"
- << "\t" << "scheduled_downtime_depth=" << (service->IsInDowntime() ? 1 : 0) << "\n"
- << "\t" << "last_notification=" << service->GetLastNotification() << "\n"
- << "\t" << "next_notification=" << service->GetNextNotification() << "\n";
+ {
+ ObjectLock olock(service);
+
+ fp << "\t" << "check_interval=" << service->GetCheckInterval() / 60.0 << "\n"
+ << "\t" << "retry_interval=" << service->GetRetryInterval() / 60.0 << "\n"
+ << "\t" << "has_been_checked=" << (service->GetLastCheckResult() ? 1 : 0) << "\n"
+ << "\t" << "should_be_scheduled=1" << "\n"
+ << "\t" << "check_execution_time=" << execution_time << "\n"
+ << "\t" << "check_latency=" << latency << "\n"
+ << "\t" << "current_state=" << state << "\n"
+ << "\t" << "state_type=" << service->GetStateType() << "\n"
+ << "\t" << "plugin_output=" << output << "\n"
+ << "\t" << "performance_data=" << perfdata << "\n"
+ << "\t" << "last_check=" << schedule_end << "\n"
+ << "\t" << "next_check=" << service->GetNextCheck() << "\n"
+ << "\t" << "current_attempt=" << service->GetCurrentCheckAttempt() << "\n"
+ << "\t" << "max_attempts=" << service->GetMaxCheckAttempts() << "\n"
+ << "\t" << "last_state_change=" << service->GetLastStateChange() << "\n"
+ << "\t" << "last_hard_state_change=" << service->GetLastHardStateChange() << "\n"
+ << "\t" << "last_update=" << time(NULL) << "\n"
+ << "\t" << "active_checks_enabled=" << (service->GetEnableActiveChecks() ? 1 : 0) <<"\n"
+ << "\t" << "passive_checks_enabled=" << (service->GetEnablePassiveChecks() ? 1 : 0) << "\n"
+ << "\t" << "problem_has_been_acknowledged=" << (service->GetAcknowledgement() != AcknowledgementNone ? 1 : 0) << "\n"
+ << "\t" << "acknowledgement_type=" << static_cast<int>(service->GetAcknowledgement()) << "\n"
+ << "\t" << "acknowledgement_end_time=" << service->GetAcknowledgementExpiry() << "\n"
+ << "\t" << "scheduled_downtime_depth=" << (service->IsInDowntime() ? 1 : 0) << "\n"
+ << "\t" << "last_notification=" << service->GetLastNotification() << "\n"
+ << "\t" << "next_notification=" << service->GetNextNotification() << "\n";
+ }
}
void CompatComponent::DumpServiceStatus(ofstream& fp, const Service::Ptr& service)
{
- ObjectLock olock(service);
+ String host_name, short_name;
+ Host::Ptr host;
+
+ {
+ ObjectLock olock(service);
+ short_name = service->GetShortName();
+ host = service->GetHost();
+ }
+
+ {
+ ObjectLock olock(host);
+ host_name = host->GetName();
+ }
fp << "servicestatus {" << "\n"
- << "\t" << "host_name=" << service->GetHost()->GetName() << "\n"
- << "\t" << "service_description=" << service->GetShortName() << "\n";
+ << "\t" << "host_name=" << host_name << "\n"
+ << "\t" << "service_description=" << short_name << "\n";
DumpServiceStatusAttrs(fp, service, CompatTypeService);
void CompatComponent::DumpServiceObject(ofstream& fp, const Service::Ptr& service)
{
- ObjectLock olock(service);
-
- fp << "define service {" << "\n"
- << "\t" << "host_name" << "\t" << service->GetHost()->GetName() << "\n"
- << "\t" << "service_description" << "\t" << service->GetShortName() << "\n"
- << "\t" << "display_name" << "\t" << service->GetDisplayName() << "\n"
- << "\t" << "check_command" << "\t" << "check_i2" << "\n"
- << "\t" << "check_interval" << "\t" << service->GetCheckInterval() / 60.0 << "\n"
- << "\t" << "retry_interval" << "\t" << service->GetRetryInterval() / 60.0 << "\n"
- << "\t" << "max_check_attempts" << "\t" << 1 << "\n"
- << "\t" << "active_checks_enabled" << "\t" << (service->GetEnableActiveChecks() ? 1 : 0) << "\n"
- << "\t" << "passive_checks_enabled" << "\t" << (service->GetEnablePassiveChecks() ? 1 : 0) << "\n"
- << "\t" << "}" << "\n"
- << "\n";
+ set<Service::Ptr> parentServices;
+ Host::Ptr host;
+ String host_name, short_name;
- BOOST_FOREACH(const Service::Ptr& parent, service->GetParentServices()) {
+ {
+ ObjectLock olock(service);
+ parentServices = service->GetParentServices();
+ host = service->GetHost();
+ short_name = service->GetShortName();
+ }
+
+ {
+ ObjectLock olock(host);
+ host_name = host->GetName();
+ }
+
+ {
+ ObjectLock olock(service);
+
+ fp << "define service {" << "\n"
+ << "\t" << "host_name" << "\t" << host_name << "\n"
+ << "\t" << "service_description" << "\t" << short_name << "\n"
+ << "\t" << "display_name" << "\t" << service->GetDisplayName() << "\n"
+ << "\t" << "check_command" << "\t" << "check_i2" << "\n"
+ << "\t" << "check_interval" << "\t" << service->GetCheckInterval() / 60.0 << "\n"
+ << "\t" << "retry_interval" << "\t" << service->GetRetryInterval() / 60.0 << "\n"
+ << "\t" << "max_check_attempts" << "\t" << 1 << "\n"
+ << "\t" << "active_checks_enabled" << "\t" << (service->GetEnableActiveChecks() ? 1 : 0) << "\n"
+ << "\t" << "passive_checks_enabled" << "\t" << (service->GetEnablePassiveChecks() ? 1 : 0) << "\n"
+ << "\t" << "}" << "\n"
+ << "\n";
+ }
+
+ BOOST_FOREACH(const Service::Ptr& parent, parentServices) {
ObjectLock plock(parent);
fp << "define servicedependency {" << "\n"
- << "\t" << "dependent_host_name" << "\t" << service->GetHost()->GetName() << "\n"
+ << "\t" << "dependent_host_name" << "\t" << host_name << "\n"
<< "\t" << "dependent_service_description" << "\t" << service->GetShortName() << "\n"
<< "\t" << "host_name" << "\t" << parent->GetHost()->GetName() << "\n"
- << "\t" << "service_description" << "\t" << parent->GetShortName() << "\n"
+ << "\t" << "service_description" << "\t" << short_name << "\n"
<< "\t" << "execution_failure_criteria" << "\t" << "n" << "\n"
<< "\t" << "notification_failure_criteria" << "\t" << "w,u,c" << "\n"
<< "\t" << "}" << "\n"
}
BOOST_FOREACH(const DynamicObject::Ptr& object, DynamicType::GetObjects("HostGroup")) {
- HostGroup::Ptr hg = static_pointer_cast<HostGroup>(object);
- ObjectLock olock(hg);
+ set<Host::Ptr> members;
+
+ {
+ HostGroup::Ptr hg = static_pointer_cast<HostGroup>(object);
+ ObjectLock olock(hg);
+
+ objectfp << "define hostgroup {" << "\n"
+ << "\t" << "hostgroup_name" << "\t" << hg->GetName() << "\n"
+ << "\t" << "notes_url" << "\t" << hg->GetNotesUrl() << "\n"
+ << "\t" << "action_url" << "\t" << hg->GetActionUrl() << "\n";
- objectfp << "define hostgroup {" << "\n"
- << "\t" << "hostgroup_name" << "\t" << hg->GetName() << "\n"
- << "\t" << "notes_url" << "\t" << hg->GetNotesUrl() << "\n"
- << "\t" << "action_url" << "\t" << hg->GetActionUrl() << "\n";
+ members = hg->GetMembers();
+ }
objectfp << "\t" << "members" << "\t";
- DumpNameList(objectfp, hg->GetMembers());
+ DumpNameList(objectfp, members);
objectfp << "\n"
<< "}" << "\n";
}
}
BOOST_FOREACH(const DynamicObject::Ptr& object, DynamicType::GetObjects("ServiceGroup")) {
- ServiceGroup::Ptr sg = static_pointer_cast<ServiceGroup>(object);
- ObjectLock olock(sg);
+ set<Service::Ptr> members;
- objectfp << "define servicegroup {" << "\n"
- << "\t" << "servicegroup_name" << "\t" << sg->GetName() << "\n"
- << "\t" << "notes_url" << "\t" << sg->GetNotesUrl() << "\n"
- << "\t" << "action_url" << "\t" << sg->GetActionUrl() << "\n";
+ {
+ ServiceGroup::Ptr sg = static_pointer_cast<ServiceGroup>(object);
+ ObjectLock olock(sg);
- objectfp << "\t" << "members" << "\t";
+ objectfp << "define servicegroup {" << "\n"
+ << "\t" << "servicegroup_name" << "\t" << sg->GetName() << "\n"
+ << "\t" << "notes_url" << "\t" << sg->GetNotesUrl() << "\n"
+ << "\t" << "action_url" << "\t" << sg->GetActionUrl() << "\n";
- vector<String> sglist;
- BOOST_FOREACH(const Service::Ptr& service, sg->GetMembers()) {
- ObjectLock slock(service);
- Host::Ptr host = service->GetHost();
+ members = sg->GetMembers();
+ }
- ObjectLock hlock(host);
- sglist.push_back(host->GetName());
+ objectfp << "\t" << "members" << "\t";
- sglist.push_back(service->GetShortName());
+ vector<String> sglist;
+ BOOST_FOREACH(const Service::Ptr& service, members) {
+ Host::Ptr host;
+ String host_name, short_name;
+
+ {
+ ObjectLock olock(service);
+ host = service->GetHost();
+ short_name = service->GetShortName();
+ }
+
+ {
+ ObjectLock olock(host);
+ host_name = host->GetName();
+ }
+
+ sglist.push_back(host_name);
+ sglist.push_back(short_name);
}
DumpStringList(objectfp, sglist);
Logger::Write(LogDebug, "compatido", log.str());
int state;
- if (!host->IsReachable())
+ if (!Host::IsReachable(host))
state = 2; /* unreachable */
- else if (!host->IsUp())
+ else if (host->GetHostCheckService()->GetState() != StateOK)
state = 1; /* down */
else
state = 0; /* up */
DynamicObject::OnRegistered.connect(boost::bind(&ReplicationComponent::LocalObjectRegisteredHandler, this, _1));
DynamicObject::OnUnregistered.connect(boost::bind(&ReplicationComponent::LocalObjectUnregisteredHandler, this, _1));
DynamicObject::OnTransactionClosing.connect(boost::bind(&ReplicationComponent::TransactionClosingHandler, this, _2));
+ DynamicObject::OnFlushObject.connect(boost::bind(&ReplicationComponent::FlushObjectHandler, this, _1));
Endpoint::OnConnected.connect(boost::bind(&ReplicationComponent::EndpointConnectedHandler, this, _1));
if (!object)
continue;
- if (!ShouldReplicateObject(object))
- continue;
+ FlushObjectHandler(object);
+ }
+}
- RequestMessage request = MakeObjectMessage(object, "config::ObjectUpdate", DynamicObject::GetCurrentTx(), true);
- EndpointManager::GetInstance()->SendMulticastMessage(m_Endpoint, request);
+void ReplicationComponent::FlushObjectHandler(const DynamicObject::Ptr& object)
+{
+ if (!ShouldReplicateObject(object))
+ return;
+
+ RequestMessage request = MakeObjectMessage(object, "config::ObjectUpdate", DynamicObject::GetCurrentTx(), true);
+
+ EndpointManager::Ptr em = EndpointManager::GetInstance();
+ {
+ ObjectLock olock(em);
+ em->SendMulticastMessage(m_Endpoint, request);
}
}
void LocalObjectRegisteredHandler(const DynamicObject::Ptr& object);
void LocalObjectUnregisteredHandler(const DynamicObject::Ptr& object);
void TransactionClosingHandler(const set<DynamicObject::WeakPtr>& modifiedObjects);
+ void FlushObjectHandler(const DynamicObject::Ptr& object);
void RemoteObjectUpdateHandler(const RequestMessage& request);
void RemoteObjectRemovedHandler(const RequestMessage& request);
return EXIT_FAILURE;
}
- DynamicObject::NewTx();
-
bool validateOnly = g_AppParams.count("validate");
if (!LoadConfigFiles(validateOnly))
return EXIT_FAILURE;
- DynamicObject::NewTx();
-
if (validateOnly) {
Logger::Write(LogInformation, "icinga-app", "Terminating as requested by --validate.");
return EXIT_SUCCESS;
flushTxTimer->Start();
#endif /* _DEBUG */
+ Timer::Initialize();
+
GetEQ().Join();
+
+ Timer::Uninitialize();
}
/**
SetConsoleCtrlHandler(&Application::CtrlHandler, TRUE);
#endif /* _WIN32 */
- DynamicObject::NewTx();
-
result = Main();
- DynamicObject::NewTx();
DynamicObject::DeactivateObjects();
return result;
*/
TResult GetResult(void)
{
- boost::mutex::scoped_lock lock(m_Mutex);
if (!m_Finished)
BOOST_THROW_EXCEPTION(runtime_error("GetResult called on an unfinished AsyncTask"));
*/
void FinishException(const boost::exception_ptr& ex)
{
- boost::mutex::scoped_lock lock(m_Mutex);
m_Exception = ex;
FinishInternal();
}
*/
void FinishResult(const TResult& result)
{
- boost::mutex::scoped_lock lock(m_Mutex);
m_Result = result;
FinishInternal();
}
/**
* Finishes the task and causes the completion callback to be invoked. This
* function must be called before the object is destroyed.
- *
- * @threadsafety Caller must hold m_Mutex.
*/
void FinishInternal(void)
{
- assert(!m_Finished);
+ CompletionCallback callback;
- m_Finished = true;
+ {
+ boost::mutex::scoped_lock lock(m_Mutex);
+ assert(!m_Finished);
- m_CV.notify_all();
+ m_Finished = true;
- if (!m_CompletionCallback.empty()) {
- Utility::QueueAsyncCallback(boost::bind(m_CompletionCallback, GetSelf()));
+ m_CV.notify_all();
- /* Clear callback because the bound function might hold a
- * reference to this task. */
- m_CompletionCallback = CompletionCallback();
+ m_CompletionCallback.swap(callback);
}
+
+ if (!callback.empty())
+ Utility::QueueAsyncCallback(boost::bind(callback, GetSelf()));
}
mutable boost::mutex m_Mutex;
signals2::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnRegistered;
signals2::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnUnregistered;
signals2::signal<void (double, const set<DynamicObject::WeakPtr>&)> DynamicObject::OnTransactionClosing;
+signals2::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnFlushObject;
DynamicObject::DynamicObject(const Dictionary::Ptr& serializedObject)
: m_Events(false), m_ConfigTx(0)
{
boost::mutex::scoped_lock lock(m_TransactionMutex);
- assert(m_CurrentTx != 0);
+ if (m_CurrentTx == 0) {
+ /* Set the initial transaction ID. */
+ m_CurrentTx = Utility::GetTime();
+ }
return m_CurrentTx;
}
+void DynamicObject::Flush(void)
+{
+ SendLocalUpdateEvents();
+ OnFlushObject(GetSelf());
+}
+
/*
* @threadsafety Always. Caller must not hold any Object locks.
*/
static signals2::signal<void (const DynamicObject::Ptr&)> OnRegistered;
static signals2::signal<void (const DynamicObject::Ptr&)> OnUnregistered;
static signals2::signal<void (double, const set<DynamicObject::WeakPtr>&)> OnTransactionClosing;
+ static signals2::signal<void (const DynamicObject::Ptr&)> OnFlushObject;
ScriptTask::Ptr MakeMethodTask(const String& method,
const vector<Value>& arguments);
void SetTx(double tx);
double GetTx(void) const;
+ void Flush(void);
+
void Register(void);
void Unregister(void);
static void DeactivateObjects(void);
static double GetCurrentTx(void);
- static void NewTx(void);
protected:
virtual void OnInitCompleted(void);
static double m_CurrentTx;
+ static void NewTx(void);
+
/* This has to be a set of raw pointers because the DynamicObject
* constructor has to be able to insert objects into this list. */
static set<DynamicObject::WeakPtr> m_ModifiedObjects;
ObjectLock olock(object);
object->SetEvents(true);
- if (m_ObjectMap.find(object->GetName()) != m_ObjectMap.end())
+ ObjectMap::iterator it = m_ObjectMap.find(object->GetName());
+
+ if (it != m_ObjectMap.end()) {
+ if (it->second == object)
+ return;
+
BOOST_THROW_EXCEPTION(runtime_error("RegisterObject() found existing object with the same name: " + object->GetName()));
+ }
m_ObjectMap[object->GetName()] = object;
m_ObjectSet.insert(object);
+
+ /* notify the object that it's been fully initialized */
+ object->OnInitCompleted();
}
void DynamicType::UnregisterObject(const DynamicObject::Ptr& object)
DynamicObject::Ptr DynamicType::CreateObject(const Dictionary::Ptr& serializedUpdate) const
{
- DynamicObject::Ptr obj = m_ObjectFactory(serializedUpdate);
- ObjectLock olock(obj);
+ DynamicObject::Ptr object = m_ObjectFactory(serializedUpdate);
+ ObjectLock olock(object);
/* register attributes */
String name;
DynamicAttributeType type;
BOOST_FOREACH(tuples::tie(name, type), m_Attributes)
- obj->RegisterAttribute(name, type);
+ object->RegisterAttribute(name, type);
/* apply the object's non-config attributes */
- obj->ApplyUpdate(serializedUpdate, Attribute_All & ~Attribute_Config);
-
- /* notify the object that it's been fully initialized */
- obj->OnInitCompleted();
+ object->ApplyUpdate(serializedUpdate, Attribute_All & ~Attribute_Config);
- return obj;
+ return object;
}
/**
if (thread_count < 4)
thread_count = 4;
- thread_count *= 8;
+ thread_count *= 4;
for (int i = 0; i < thread_count; i++)
m_Threads.create_thread(boost::bind(&EventQueue::QueueThreadProc, this));
*/
void EventQueue::QueueThreadProc(void)
{
- while (!m_Stopped) {
+ for (;;) {
vector<Callback> events;
{
while (m_Events.empty() && !m_Stopped)
m_CV.wait(lock);
+ if (m_Stopped)
+ break;
+
events.swap(m_Events);
}
{
boost::mutex::scoped_lock lock(m_Mutex);
m_Events.push_back(callback);
- m_CV.notify_all();
+ m_CV.notify_one();
}
*/
Object::SharedPtrHolder Object::GetSelf(void)
{
- ObjectLock olock(this);
return Object::SharedPtrHolder(shared_from_this());
}
if (fd >= 0)
tasks[fd] = task;
} catch (...) {
- Application::GetEQ().Post(boost::bind(&Process::FinishException, task, boost::current_exception()));
+ ObjectLock olock(task);
+ task->FinishException(boost::current_exception());
}
}
prev = it;
tasks.erase(prev);
- Application::GetEQ().Post(boost::bind(&Process::FinishResult, task, task->m_Result));
+ ObjectLock olock(task);
+ task->FinishResult(task->m_Result);
}
}
}
Process::Process(const vector<String>& arguments, const Dictionary::Ptr& extraEnvironment)
: AsyncTask<Process, ProcessResult>(), m_Arguments(arguments), m_ExtraEnvironment(extraEnvironment)
{
- boost::call_once(&Process::Initialize, m_ThreadOnce);
+ {
+ boost::mutex::scoped_lock lock(m_Mutex);
+ boost::call_once(&Process::Initialize, m_ThreadOnce);
+ }
#ifndef _WIN32
m_FD = -1;
strftime(timestamp, sizeof(timestamp), "%Y/%m/%d %H:%M:%S %z", &tmnow);
+ boost::mutex::scoped_lock lock(m_Mutex);
+
if (tty) {
switch (entry.Severity) {
case LogWarning:
}
}
- boost::mutex::scoped_lock lock(m_Mutex);
stream << "[" << timestamp << "] "
<< Logger::SeverityToString(entry.Severity) << "/" << entry.Facility << ": "
<< entry.Message;
using namespace icinga;
Timer::TimerSet Timer::m_Timers;
+thread Timer::m_Thread;
boost::mutex Timer::m_Mutex;
boost::condition_variable Timer::m_CV;
-boost::once_flag Timer::m_ThreadOnce = BOOST_ONCE_INIT;
+bool Timer::m_StopThread;
/**
* Extracts the next timestamp from a Timer.
*/
void Timer::Initialize(void)
{
- thread worker(boost::bind(&Timer::TimerThreadProc));
- worker.detach();
+ boost::mutex::scoped_lock lock(m_Mutex);
+ m_StopThread = false;
+ m_Thread = thread(boost::bind(&Timer::TimerThreadProc));
+}
+
+/**
+ * Disables the timer sub-system.
+ *
+ * @threadsafety Always.
+ */
+void Timer::Uninitialize(void)
+{
+ {
+ boost::mutex::scoped_lock lock(m_Mutex);
+ m_StopThread = true;
+ m_CV.notify_all();
+ }
+
+ m_Thread.join();
}
/**
*/
void Timer::Call(void)
{
- double st = Utility::GetTime();
-
OnTimerExpired(GetSelf());
- double et = Utility::GetTime();
-
- if (et - st > 1.0) {
- stringstream msgbuf;
- msgbuf << "Timer call took " << et - st << " seconds.";
- Logger::Write(LogWarning, "base", msgbuf.str());
- }
-
/* Re-enable the timer so it can be called again. */
m_Started = true;
Reschedule();
*/
void Timer::Start(void)
{
- boost::call_once(&Timer::Initialize, m_ThreadOnce);
-
m_Started = true;
Reschedule();
NextTimerView& idx = boost::get<1>(m_Timers);
/* Wait until there is at least one timer. */
- while (idx.empty())
+ while (idx.empty() && !m_StopThread)
m_CV.wait(lock);
+ if (m_StopThread)
+ break;
+
NextTimerView::iterator it = idx.begin();
Timer::Ptr timer = it->lock();
signals2::signal<void(const Timer::Ptr&)> OnTimerExpired;
+ static void Initialize(void);
+ static void Uninitialize(void);
+
private:
double m_Interval; /**< The interval of the timer. */
double m_Next; /**< When the next event should happen. */
static boost::mutex m_Mutex;
static boost::condition_variable m_CV;
+ static thread m_Thread;
+ static bool m_StopThread;
static TimerSet m_Timers;
void Call(void);
- static boost::once_flag m_ThreadOnce;
- static void Initialize(void);
-
static void TimerThreadProc(void);
friend struct TimerNextExtractor;
vector<ConfigItem::Ptr> obsoleteItems;
- {
- boost::mutex::scoped_lock lock(m_Mutex);
-
- ConfigItem::Ptr item;
- BOOST_FOREACH(tie(tuples::ignore, item), m_Items) {
- ObjectLock olock(item);
+ ConfigItem::Ptr item;
+ BOOST_FOREACH(tie(tuples::ignore, item), m_Items) {
+ ObjectLock olock(item);
- if (item->GetUnit() != unit)
- continue;
+ if (item->GetUnit() != unit)
+ continue;
- obsoleteItems.push_back(item);
- }
+ obsoleteItems.push_back(item);
}
BOOST_FOREACH(const ConfigItem::Ptr& item, obsoleteItems) {
Host::Ptr host = Host::GetByName(arguments[0]);
- if (host->IsUp())
- BOOST_THROW_EXCEPTION(invalid_argument("The host '" + arguments[0] + "' is OK."));
-
Logger::Write(LogInformation, "icinga", "Setting acknowledgement for host '" + host->GetName() + "'");
Service::Ptr service = host->GetHostCheckService();
if (service) {
+ if (service->GetState() == StateOK)
+ BOOST_THROW_EXCEPTION(invalid_argument("The host '" + arguments[0] + "' is OK."));
+
service->SetAcknowledgement(sticky ? AcknowledgementSticky : AcknowledgementNormal);
service->SetAcknowledgementExpiry(0);
}
Host::Ptr host = Host::GetByName(arguments[0]);
- if (host->IsUp())
- BOOST_THROW_EXCEPTION(invalid_argument("The host '" + arguments[0] + "' is OK."));
-
Logger::Write(LogInformation, "icinga", "Setting timed acknowledgement for host '" + host->GetName() + "'");
Service::Ptr service = host->GetHostCheckService();
if (service) {
+ if (service->GetState() == StateOK)
+ BOOST_THROW_EXCEPTION(invalid_argument("The host '" + arguments[0] + "' is OK."));
+
service->SetAcknowledgement(sticky ? AcknowledgementSticky : AcknowledgementNormal);
service->SetAcknowledgementExpiry(timestamp);
}
Host::Host(const Dictionary::Ptr& properties)
: DynamicObject(properties)
-{ }
+{
+ HostGroup::InvalidateMembersCache();
+}
+
+void Host::OnInitCompleted(void)
+{
+ UpdateSlaveServices();
+}
Host::~Host(void)
{
return Get("hostcheck");
}
-bool Host::IsReachable(void)
+bool Host::IsReachable(const Host::Ptr& self)
{
- BOOST_FOREACH(const Service::Ptr& service, GetParentServices()) {
+ set<Service::Ptr> parentServices;
+
+ {
+ ObjectLock olock(self);
+ parentServices = self->GetParentServices();
+ }
+
+ BOOST_FOREACH(const Service::Ptr& service, parentServices) {
+ ObjectLock olock(service);
+
/* ignore pending services */
if (!service->GetLastCheckResult())
continue;
return false;
}
- BOOST_FOREACH(const Host::Ptr& host, GetParentHosts()) {
- /* ignore hosts that are up */
- if (host->IsUp())
- continue;
+ set<Host::Ptr> parentHosts;
- return false;
+ {
+ ObjectLock olock(self);
+ parentHosts = self->GetParentHosts();
}
- return true;
-}
-
-bool Host::IsInDowntime(void) const
-{
- Service::Ptr service = GetHostCheckService();
-
- if (!service)
- return false;
+ BOOST_FOREACH(const Host::Ptr& host, parentHosts) {
+ Service::Ptr hc;
- ObjectLock olock(service);
- return (service || service->IsInDowntime());
-}
+ {
+ ObjectLock olock(host);
+ hc = host->GetHostCheckService();
+ }
-bool Host::IsUp(void) const
-{
- Service::Ptr service = GetHostCheckService();
+ /* ignore hosts that are up */
+ if (hc && hc->GetState() == StateOK)
+ continue;
- if (!service)
- return true;
+ return false;
+ }
- ObjectLock olock(service);
- return (service->GetState() == StateOK || service->GetState() == StateWarning);
+ return true;
}
template<bool copyServiceAttrs, typename TDict>
{
if (name == "hostgroups")
HostGroup::InvalidateMembersCache();
- else if (name == "services")
+ else if (name == "services") {
+ ObjectLock olock(this);
UpdateSlaveServices();
- else if (name == "notifications") {
- BOOST_FOREACH(const Service::Ptr& service, GetServices()) {
+ } else if (name == "notifications") {
+ set<Service::Ptr> services;
+
+ {
+ ObjectLock olock(this);
+ services = GetServices();
+ }
+
+ BOOST_FOREACH(const Service::Ptr& service, services) {
+ ObjectLock olock(service);
service->UpdateSlaveNotifications();
}
}
set<Host::Ptr> GetParentHosts(void) const;
set<shared_ptr<Service> > GetParentServices(void) const;
- bool IsReachable(void);
- bool IsInDowntime(void) const;
- bool IsUp(void) const;
+ static bool IsReachable(const Host::Ptr& self);
shared_ptr<Service> GetServiceByShortName(const Value& name) const;
const std::vector<icinga::Value>& arguments);
protected:
+ void OnInitCompleted(void);
void OnAttributeChanged(const String& name, const Value& oldValue);
private:
m_Tasks.erase(task);
try {
- (void) task->GetResult();
+ {
+ ObjectLock tlock(task);
+ (void) task->GetResult();
+ }
Logger::Write(LogInformation, "icinga", "Completed sending notification for service '" + GetService()->GetName() + "'");
} catch (const exception& ex) {
ProcessResult pr;
try {
- pr = ct.m_Process->GetResult();
+ {
+ ObjectLock tlock(ct.m_Process);
+ pr = ct.m_Process->GetResult();
+ }
if (pr.ExitStatus != 0) {
stringstream msgbuf;
/* Make sure the notification component sees the updated
* state/state_type attributes. */
- DynamicObject::NewTx();
+ Flush();
- if (IsReachable() && !IsInDowntime() && !IsAcknowledged())
+ if (IsReachable(GetSelf()) && !IsInDowntime() && !IsAcknowledged())
RequestNotifications(NotificationStateChange);
}
}
Dictionary::Ptr result;
try {
- Value vresult = task->GetResult();
+ Value vresult;
+
+ {
+ ObjectLock tlock(task);
+ vresult = task->GetResult();
+ }
if (vresult.IsObjectType<Dictionary>())
result = vresult;
Service::UpdateStatistics(cr);
- /* flush the current transaction so other instances see the service's
+ /* Flush the object so other instances see the service's
* new state when they receive the CheckResult message */
- DynamicObject::NewTx();
+ Flush();
RequestMessage rm;
rm.SetMethod("checker::CheckResult");
newNotifications = boost::make_shared<Dictionary>();
vector<Dictionary::Ptr> notificationDescsList;
- notificationDescsList.push_back(GetHost()->Get("notifications"));
+
+ String host_name;
+
+ {
+ Host::Ptr host = GetHost();
+ ObjectLock olock(host);
+
+ notificationDescsList.push_back(host->Get("notifications"));
+ host_name = host->GetName();
+ }
+
notificationDescsList.push_back(Get("notifications"));
BOOST_FOREACH(const Dictionary::Ptr& notificationDescs, notificationDescsList) {
ConfigItemBuilder::Ptr builder = boost::make_shared<ConfigItemBuilder>(item->GetDebugInfo());
builder->SetType("Notification");
builder->SetName(name);
- builder->AddExpression("host_name", OperatorSet, GetHost()->GetName());
+ builder->AddExpression("host_name", OperatorSet, host_name);
builder->AddExpression("service", OperatorSet, GetName());
CopyNotificationAttributes(this, builder);
Service::Service(const Dictionary::Ptr& serializedObject)
: DynamicObject(serializedObject)
-{ }
+{
+ ServiceGroup::InvalidateMembersCache();
+ Host::InvalidateServicesCache();
+ Service::InvalidateDowntimesCache();
+ Service::InvalidateCommentsCache();
+}
+
+void Service::OnInitCompleted(void)
+{
+ UpdateSlaveNotifications();
+}
Service::~Service(void)
{
return value;
}
-bool Service::IsReachable(void) const
+bool Service::IsReachable(const Service::Ptr& self)
{
- BOOST_FOREACH(const Service::Ptr& service, GetParentServices()) {
+ set<Service::Ptr> parentServices;
+
+ {
+ ObjectLock olock(self);
+ parentServices = self->GetParentServices();
+ }
+
+ BOOST_FOREACH(const Service::Ptr& service, parentServices) {
+ ObjectLock olock(service);
+
/* ignore pending services */
if (!service->GetLastCheckResult())
continue;
return false;
}
- BOOST_FOREACH(const Host::Ptr& host, GetParentHosts()) {
+ set<Host::Ptr> parentHosts;
+
+ {
+ ObjectLock olock(self);
+ parentHosts = self->GetParentHosts();
+ }
+
+ BOOST_FOREACH(const Host::Ptr& host, parentHosts) {
+ Service::Ptr hc;
+
+ {
+ ObjectLock olock(host);
+ hc = host->GetHostCheckService();
+ }
+
/* ignore hosts that are up */
- if (host->IsUp())
+ if (hc && hc->GetState() == StateOK)
continue;
return false;
void Service::OnAttributeChanged(const String& name, const Value& oldValue)
{
- ObjectLock olock(this);
-
if (name == "checker")
OnCheckerChanged(GetSelf(), oldValue);
else if (name == "next_check")
ServiceGroup::InvalidateMembersCache();
else if (name == "host_name" || name == "short_name") {
Host::InvalidateServicesCache();
- UpdateSlaveNotifications();
+
+ {
+ ObjectLock olock(this);
+ UpdateSlaveNotifications();
+ }
} else if (name == "downtimes")
Service::InvalidateDowntimesCache();
else if (name == "comments")
else if (name == "notifications")
UpdateSlaveNotifications();
else if (name == "check_interval") {
+ ObjectLock(this);
ConfigItem::Ptr item = ConfigItem::GetObject("Service", GetName());
/* update the next check timestamp if we're the owner of this service */
set<Host::Ptr> GetParentHosts(void) const;
set<Service::Ptr> GetParentServices(void) const;
- bool IsReachable(void) const;
+ bool IsReachable(const Service::Ptr& self);
AcknowledgementType GetAcknowledgement(void);
void SetAcknowledgement(AcknowledgementType acknowledgement);
void SetNextNotification(double time);
protected:
+ virtual void OnInitCompleted(void);
virtual void OnAttributeChanged(const String& name, const Value& oldValue);
private: