From: Gunnar Beutner Date: Tue, 19 Feb 2013 22:02:08 +0000 (+0100) Subject: Fine-grained locks (WIP, Part 6). X-Git-Tag: v0.0.2~376 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=997ca3a77a35d52ed2f42ed593fb502ac33a8d40;p=icinga2 Fine-grained locks (WIP, Part 6). --- diff --git a/components/checker/checkercomponent.cpp b/components/checker/checkercomponent.cpp index dbc8617dc..e72566ace 100644 --- a/components/checker/checkercomponent.cpp +++ b/components/checker/checkercomponent.cpp @@ -33,10 +33,10 @@ void CheckerComponent::Start(void) Service::OnCheckerChanged.connect(bind(&CheckerComponent::CheckerChangedHandler, this, _1)); Service::OnNextCheckChanged.connect(bind(&CheckerComponent::NextCheckChangedHandler, this, _1)); - DynamicObject::OnUnregistered.connect(bind(&CheckerComponent::ObjectRemovedHandler, this, _1)); - boost::thread thread(boost::bind(&CheckerComponent::CheckThreadProc, this)); - thread.detach(); + m_Stopped = false; + + m_Thread = thread(boost::bind(&CheckerComponent::CheckThreadProc, this)); m_ResultTimer = boost::make_shared(); m_ResultTimer->SetInterval(5); @@ -47,6 +47,14 @@ void CheckerComponent::Start(void) void CheckerComponent::Stop(void) { m_Endpoint->Unregister(); + + { + boost::mutex::scoped_lock lock(m_Mutex); + m_Stopped = true; + m_CV.notify_all(); + } + + m_Thread.join(); } void CheckerComponent::CheckThreadProc(void) @@ -60,9 +68,12 @@ void CheckerComponent::CheckThreadProc(void) typedef nth_index::type CheckTimeView; CheckTimeView& idx = boost::get<1>(m_IdleServices); - while (idx.begin() == idx.end()) + while (idx.begin() == idx.end() && !m_Stopped) m_CV.wait(lock); + if (m_Stopped) + break; + CheckTimeView::iterator it = idx.begin(); service = it->lock(); @@ -85,7 +96,8 @@ void CheckerComponent::CheckThreadProc(void) /* Wait for the next check. */ boost::mutex::scoped_lock lock(m_Mutex); - m_CV.timed_wait(lock, boost::posix_time::milliseconds(wait * 1000)); + if (!m_Stopped) + m_CV.timed_wait(lock, boost::posix_time::milliseconds(wait * 1000)); continue; } @@ -216,19 +228,3 @@ void CheckerComponent::NextCheckChangedHandler(const Service::Ptr& service) } } -void CheckerComponent::ObjectRemovedHandler(const DynamicObject::Ptr& object) -{ - Service::Ptr service = dynamic_pointer_cast(object); - - /* ignore it if the removed object is not a service */ - if (!service) - return; - - { - boost::mutex::scoped_lock lock(m_Mutex); - - m_IdleServices.erase(service); - m_PendingServices.erase(service); - m_CV.notify_all(); - } -} diff --git a/components/checker/checkercomponent.h b/components/checker/checkercomponent.h index 01c5a4191..f4f5704e0 100644 --- a/components/checker/checkercomponent.h +++ b/components/checker/checkercomponent.h @@ -69,6 +69,8 @@ private: boost::mutex m_Mutex; boost::condition_variable m_CV; + bool m_Stopped; + thread m_Thread; ServiceSet m_IdleServices; ServiceSet m_PendingServices; @@ -84,7 +86,6 @@ private: void CheckerChangedHandler(const Service::Ptr& service); void NextCheckChangedHandler(const Service::Ptr& service); - void ObjectRemovedHandler(const DynamicObject::Ptr& object); void RescheduleCheckTimer(void); }; diff --git a/components/compat/compatcomponent.cpp b/components/compat/compatcomponent.cpp index 578cc0a05..583f11b94 100644 --- a/components/compat/compatcomponent.cpp +++ b/components/compat/compatcomponent.cpp @@ -257,31 +257,40 @@ void CompatComponent::DumpDowntimes(ofstream& fp, const Service::Ptr& owner, Com void CompatComponent::DumpHostStatus(ofstream& fp, const Host::Ptr& host) { - ObjectLock olock(host); + Service::Ptr hc; + + { + ObjectLock olock(host); + hc = host->GetHostCheckService(); + + fp << "hoststatus {" << "\n" + << "\t" << "host_name=" << host->GetName() << "\n"; + } + + ServiceState hcState = StateOK; + + if (hc) { + ObjectLock olock(hc); + hcState = hc->GetState(); + } int state; - if (!host->IsReachable()) + if (!Host::IsReachable(host)) state = 2; /* unreachable */ - else if (!host->IsUp()) + else if (hcState != StateOK) state = 1; /* down */ else state = 0; /* up */ - fp << "hoststatus {" << "\n" - << "\t" << "host_name=" << host->GetName() << "\n"; - - Service::Ptr hostcheck = host->GetHostCheckService(); - - if (hostcheck) { - DumpServiceStatusAttrs(fp, hostcheck, CompatTypeHost); - } + if (hc) + DumpServiceStatusAttrs(fp, hc, CompatTypeHost); fp << "\t" << "}" << "\n" << "\n"; - if (hostcheck) { - DumpDowntimes(fp, hostcheck, CompatTypeHost); - DumpComments(fp, hostcheck, CompatTypeHost); + if (hc) { + DumpDowntimes(fp, hc, CompatTypeHost); + DumpComments(fp, hc, CompatTypeHost); } } @@ -312,14 +321,23 @@ void CompatComponent::DumpHostObject(ofstream& fp, const Host::Ptr& host) void CompatComponent::DumpServiceStatusAttrs(ofstream& fp, const Service::Ptr& service, CompatObjectType type) { - ObjectLock olock(service); - String output; String perfdata; double schedule_start = -1, schedule_end = -1; double execution_start = -1, execution_end = -1; - Dictionary::Ptr cr = service->GetLastCheckResult(); + Dictionary::Ptr cr; + int state; + Host::Ptr host; + + { + ObjectLock olock(service); + + cr = service->GetLastCheckResult(); + state = service->GetState(); + host = service->GetHost(); + } + if (cr) { output = cr->Get("output"); schedule_start = cr->Get("schedule_start"); @@ -332,8 +350,6 @@ void CompatComponent::DumpServiceStatusAttrs(ofstream& fp, const Service::Ptr& s double execution_time = (execution_end - execution_start); double latency = (schedule_end - schedule_start) - execution_time; - int state = service->GetState(); - if (state > StateUnknown) state = StateUnknown; @@ -343,44 +359,60 @@ void CompatComponent::DumpServiceStatusAttrs(ofstream& fp, const Service::Ptr& s else state = 1; - if (!service->GetHost()->IsReachable()) + if (Host::IsReachable(host)) state = 2; } - fp << "\t" << "check_interval=" << service->GetCheckInterval() / 60.0 << "\n" - << "\t" << "retry_interval=" << service->GetRetryInterval() / 60.0 << "\n" - << "\t" << "has_been_checked=" << (service->GetLastCheckResult() ? 1 : 0) << "\n" - << "\t" << "should_be_scheduled=1" << "\n" - << "\t" << "check_execution_time=" << execution_time << "\n" - << "\t" << "check_latency=" << latency << "\n" - << "\t" << "current_state=" << state << "\n" - << "\t" << "state_type=" << service->GetStateType() << "\n" - << "\t" << "plugin_output=" << output << "\n" - << "\t" << "performance_data=" << perfdata << "\n" - << "\t" << "last_check=" << schedule_end << "\n" - << "\t" << "next_check=" << service->GetNextCheck() << "\n" - << "\t" << "current_attempt=" << service->GetCurrentCheckAttempt() << "\n" - << "\t" << "max_attempts=" << service->GetMaxCheckAttempts() << "\n" - << "\t" << "last_state_change=" << service->GetLastStateChange() << "\n" - << "\t" << "last_hard_state_change=" << service->GetLastHardStateChange() << "\n" - << "\t" << "last_update=" << time(NULL) << "\n" - << "\t" << "active_checks_enabled=" << (service->GetEnableActiveChecks() ? 1 : 0) <<"\n" - << "\t" << "passive_checks_enabled=" << (service->GetEnablePassiveChecks() ? 1 : 0) << "\n" - << "\t" << "problem_has_been_acknowledged=" << (service->GetAcknowledgement() != AcknowledgementNone ? 1 : 0) << "\n" - << "\t" << "acknowledgement_type=" << static_cast(service->GetAcknowledgement()) << "\n" - << "\t" << "acknowledgement_end_time=" << service->GetAcknowledgementExpiry() << "\n" - << "\t" << "scheduled_downtime_depth=" << (service->IsInDowntime() ? 1 : 0) << "\n" - << "\t" << "last_notification=" << service->GetLastNotification() << "\n" - << "\t" << "next_notification=" << service->GetNextNotification() << "\n"; + { + ObjectLock olock(service); + + fp << "\t" << "check_interval=" << service->GetCheckInterval() / 60.0 << "\n" + << "\t" << "retry_interval=" << service->GetRetryInterval() / 60.0 << "\n" + << "\t" << "has_been_checked=" << (service->GetLastCheckResult() ? 1 : 0) << "\n" + << "\t" << "should_be_scheduled=1" << "\n" + << "\t" << "check_execution_time=" << execution_time << "\n" + << "\t" << "check_latency=" << latency << "\n" + << "\t" << "current_state=" << state << "\n" + << "\t" << "state_type=" << service->GetStateType() << "\n" + << "\t" << "plugin_output=" << output << "\n" + << "\t" << "performance_data=" << perfdata << "\n" + << "\t" << "last_check=" << schedule_end << "\n" + << "\t" << "next_check=" << service->GetNextCheck() << "\n" + << "\t" << "current_attempt=" << service->GetCurrentCheckAttempt() << "\n" + << "\t" << "max_attempts=" << service->GetMaxCheckAttempts() << "\n" + << "\t" << "last_state_change=" << service->GetLastStateChange() << "\n" + << "\t" << "last_hard_state_change=" << service->GetLastHardStateChange() << "\n" + << "\t" << "last_update=" << time(NULL) << "\n" + << "\t" << "active_checks_enabled=" << (service->GetEnableActiveChecks() ? 1 : 0) <<"\n" + << "\t" << "passive_checks_enabled=" << (service->GetEnablePassiveChecks() ? 1 : 0) << "\n" + << "\t" << "problem_has_been_acknowledged=" << (service->GetAcknowledgement() != AcknowledgementNone ? 1 : 0) << "\n" + << "\t" << "acknowledgement_type=" << static_cast(service->GetAcknowledgement()) << "\n" + << "\t" << "acknowledgement_end_time=" << service->GetAcknowledgementExpiry() << "\n" + << "\t" << "scheduled_downtime_depth=" << (service->IsInDowntime() ? 1 : 0) << "\n" + << "\t" << "last_notification=" << service->GetLastNotification() << "\n" + << "\t" << "next_notification=" << service->GetNextNotification() << "\n"; + } } void CompatComponent::DumpServiceStatus(ofstream& fp, const Service::Ptr& service) { - ObjectLock olock(service); + String host_name, short_name; + Host::Ptr host; + + { + ObjectLock olock(service); + short_name = service->GetShortName(); + host = service->GetHost(); + } + + { + ObjectLock olock(host); + host_name = host->GetName(); + } fp << "servicestatus {" << "\n" - << "\t" << "host_name=" << service->GetHost()->GetName() << "\n" - << "\t" << "service_description=" << service->GetShortName() << "\n"; + << "\t" << "host_name=" << host_name << "\n" + << "\t" << "service_description=" << short_name << "\n"; DumpServiceStatusAttrs(fp, service, CompatTypeService); @@ -393,29 +425,47 @@ void CompatComponent::DumpServiceStatus(ofstream& fp, const Service::Ptr& servic void CompatComponent::DumpServiceObject(ofstream& fp, const Service::Ptr& service) { - ObjectLock olock(service); - - fp << "define service {" << "\n" - << "\t" << "host_name" << "\t" << service->GetHost()->GetName() << "\n" - << "\t" << "service_description" << "\t" << service->GetShortName() << "\n" - << "\t" << "display_name" << "\t" << service->GetDisplayName() << "\n" - << "\t" << "check_command" << "\t" << "check_i2" << "\n" - << "\t" << "check_interval" << "\t" << service->GetCheckInterval() / 60.0 << "\n" - << "\t" << "retry_interval" << "\t" << service->GetRetryInterval() / 60.0 << "\n" - << "\t" << "max_check_attempts" << "\t" << 1 << "\n" - << "\t" << "active_checks_enabled" << "\t" << (service->GetEnableActiveChecks() ? 1 : 0) << "\n" - << "\t" << "passive_checks_enabled" << "\t" << (service->GetEnablePassiveChecks() ? 1 : 0) << "\n" - << "\t" << "}" << "\n" - << "\n"; + set parentServices; + Host::Ptr host; + String host_name, short_name; - BOOST_FOREACH(const Service::Ptr& parent, service->GetParentServices()) { + { + ObjectLock olock(service); + parentServices = service->GetParentServices(); + host = service->GetHost(); + short_name = service->GetShortName(); + } + + { + ObjectLock olock(host); + host_name = host->GetName(); + } + + { + ObjectLock olock(service); + + fp << "define service {" << "\n" + << "\t" << "host_name" << "\t" << host_name << "\n" + << "\t" << "service_description" << "\t" << short_name << "\n" + << "\t" << "display_name" << "\t" << service->GetDisplayName() << "\n" + << "\t" << "check_command" << "\t" << "check_i2" << "\n" + << "\t" << "check_interval" << "\t" << service->GetCheckInterval() / 60.0 << "\n" + << "\t" << "retry_interval" << "\t" << service->GetRetryInterval() / 60.0 << "\n" + << "\t" << "max_check_attempts" << "\t" << 1 << "\n" + << "\t" << "active_checks_enabled" << "\t" << (service->GetEnableActiveChecks() ? 1 : 0) << "\n" + << "\t" << "passive_checks_enabled" << "\t" << (service->GetEnablePassiveChecks() ? 1 : 0) << "\n" + << "\t" << "}" << "\n" + << "\n"; + } + + BOOST_FOREACH(const Service::Ptr& parent, parentServices) { ObjectLock plock(parent); fp << "define servicedependency {" << "\n" - << "\t" << "dependent_host_name" << "\t" << service->GetHost()->GetName() << "\n" + << "\t" << "dependent_host_name" << "\t" << host_name << "\n" << "\t" << "dependent_service_description" << "\t" << service->GetShortName() << "\n" << "\t" << "host_name" << "\t" << parent->GetHost()->GetName() << "\n" - << "\t" << "service_description" << "\t" << parent->GetShortName() << "\n" + << "\t" << "service_description" << "\t" << short_name << "\n" << "\t" << "execution_failure_criteria" << "\t" << "n" << "\n" << "\t" << "notification_failure_criteria" << "\t" << "w,u,c" << "\n" << "\t" << "}" << "\n" @@ -494,16 +544,22 @@ void CompatComponent::StatusTimerHandler(void) } BOOST_FOREACH(const DynamicObject::Ptr& object, DynamicType::GetObjects("HostGroup")) { - HostGroup::Ptr hg = static_pointer_cast(object); - ObjectLock olock(hg); + set members; + + { + HostGroup::Ptr hg = static_pointer_cast(object); + ObjectLock olock(hg); + + objectfp << "define hostgroup {" << "\n" + << "\t" << "hostgroup_name" << "\t" << hg->GetName() << "\n" + << "\t" << "notes_url" << "\t" << hg->GetNotesUrl() << "\n" + << "\t" << "action_url" << "\t" << hg->GetActionUrl() << "\n"; - objectfp << "define hostgroup {" << "\n" - << "\t" << "hostgroup_name" << "\t" << hg->GetName() << "\n" - << "\t" << "notes_url" << "\t" << hg->GetNotesUrl() << "\n" - << "\t" << "action_url" << "\t" << hg->GetActionUrl() << "\n"; + members = hg->GetMembers(); + } objectfp << "\t" << "members" << "\t"; - DumpNameList(objectfp, hg->GetMembers()); + DumpNameList(objectfp, members); objectfp << "\n" << "}" << "\n"; } @@ -516,25 +572,40 @@ void CompatComponent::StatusTimerHandler(void) } BOOST_FOREACH(const DynamicObject::Ptr& object, DynamicType::GetObjects("ServiceGroup")) { - ServiceGroup::Ptr sg = static_pointer_cast(object); - ObjectLock olock(sg); + set members; - objectfp << "define servicegroup {" << "\n" - << "\t" << "servicegroup_name" << "\t" << sg->GetName() << "\n" - << "\t" << "notes_url" << "\t" << sg->GetNotesUrl() << "\n" - << "\t" << "action_url" << "\t" << sg->GetActionUrl() << "\n"; + { + ServiceGroup::Ptr sg = static_pointer_cast(object); + ObjectLock olock(sg); - objectfp << "\t" << "members" << "\t"; + objectfp << "define servicegroup {" << "\n" + << "\t" << "servicegroup_name" << "\t" << sg->GetName() << "\n" + << "\t" << "notes_url" << "\t" << sg->GetNotesUrl() << "\n" + << "\t" << "action_url" << "\t" << sg->GetActionUrl() << "\n"; - vector sglist; - BOOST_FOREACH(const Service::Ptr& service, sg->GetMembers()) { - ObjectLock slock(service); - Host::Ptr host = service->GetHost(); + members = sg->GetMembers(); + } - ObjectLock hlock(host); - sglist.push_back(host->GetName()); + objectfp << "\t" << "members" << "\t"; - sglist.push_back(service->GetShortName()); + vector sglist; + BOOST_FOREACH(const Service::Ptr& service, members) { + Host::Ptr host; + String host_name, short_name; + + { + ObjectLock olock(service); + host = service->GetHost(); + short_name = service->GetShortName(); + } + + { + ObjectLock olock(host); + host_name = host->GetName(); + } + + sglist.push_back(host_name); + sglist.push_back(short_name); } DumpStringList(objectfp, sglist); diff --git a/components/compatido/compatidocomponent.cpp b/components/compatido/compatidocomponent.cpp index 3132b8994..982065ffe 100644 --- a/components/compatido/compatidocomponent.cpp +++ b/components/compatido/compatidocomponent.cpp @@ -438,9 +438,9 @@ void CompatIdoComponent::DumpHostStatus(const Host::Ptr& host) Logger::Write(LogDebug, "compatido", log.str()); int state; - if (!host->IsReachable()) + if (!Host::IsReachable(host)) state = 2; /* unreachable */ - else if (!host->IsUp()) + else if (host->GetHostCheckService()->GetState() != StateOK) state = 1; /* down */ else state = 0; /* up */ diff --git a/components/replication/replicationcomponent.cpp b/components/replication/replicationcomponent.cpp index 3fb30b7ca..cb81abf99 100644 --- a/components/replication/replicationcomponent.cpp +++ b/components/replication/replicationcomponent.cpp @@ -33,6 +33,7 @@ void ReplicationComponent::Start(void) DynamicObject::OnRegistered.connect(boost::bind(&ReplicationComponent::LocalObjectRegisteredHandler, this, _1)); DynamicObject::OnUnregistered.connect(boost::bind(&ReplicationComponent::LocalObjectUnregisteredHandler, this, _1)); DynamicObject::OnTransactionClosing.connect(boost::bind(&ReplicationComponent::TransactionClosingHandler, this, _2)); + DynamicObject::OnFlushObject.connect(boost::bind(&ReplicationComponent::FlushObjectHandler, this, _1)); Endpoint::OnConnected.connect(boost::bind(&ReplicationComponent::EndpointConnectedHandler, this, _1)); @@ -175,11 +176,21 @@ void ReplicationComponent::TransactionClosingHandler(const setSendMulticastMessage(m_Endpoint, request); +void ReplicationComponent::FlushObjectHandler(const DynamicObject::Ptr& object) +{ + if (!ShouldReplicateObject(object)) + return; + + RequestMessage request = MakeObjectMessage(object, "config::ObjectUpdate", DynamicObject::GetCurrentTx(), true); + + EndpointManager::Ptr em = EndpointManager::GetInstance(); + { + ObjectLock olock(em); + em->SendMulticastMessage(m_Endpoint, request); } } diff --git a/components/replication/replicationcomponent.h b/components/replication/replicationcomponent.h index 4843836f4..85aa9b40b 100644 --- a/components/replication/replicationcomponent.h +++ b/components/replication/replicationcomponent.h @@ -42,6 +42,7 @@ private: void LocalObjectRegisteredHandler(const DynamicObject::Ptr& object); void LocalObjectUnregisteredHandler(const DynamicObject::Ptr& object); void TransactionClosingHandler(const set& modifiedObjects); + void FlushObjectHandler(const DynamicObject::Ptr& object); void RemoteObjectUpdateHandler(const RequestMessage& request); void RemoteObjectRemovedHandler(const RequestMessage& request); diff --git a/icinga-app/icinga.cpp b/icinga-app/icinga.cpp index 01f21a5f9..124c7d68f 100644 --- a/icinga-app/icinga.cpp +++ b/icinga-app/icinga.cpp @@ -232,15 +232,11 @@ int main(int argc, char **argv) return EXIT_FAILURE; } - DynamicObject::NewTx(); - bool validateOnly = g_AppParams.count("validate"); if (!LoadConfigFiles(validateOnly)) return EXIT_FAILURE; - DynamicObject::NewTx(); - if (validateOnly) { Logger::Write(LogInformation, "icinga-app", "Terminating as requested by --validate."); return EXIT_SUCCESS; diff --git a/lib/base/application.cpp b/lib/base/application.cpp index 99218a524..952b80c69 100644 --- a/lib/base/application.cpp +++ b/lib/base/application.cpp @@ -149,7 +149,11 @@ void Application::RunEventLoop(void) const flushTxTimer->Start(); #endif /* _DEBUG */ + Timer::Initialize(); + GetEQ().Join(); + + Timer::Uninitialize(); } /** @@ -419,11 +423,8 @@ int Application::Run(void) SetConsoleCtrlHandler(&Application::CtrlHandler, TRUE); #endif /* _WIN32 */ - DynamicObject::NewTx(); - result = Main(); - DynamicObject::NewTx(); DynamicObject::DeactivateObjects(); return result; diff --git a/lib/base/asynctask.h b/lib/base/asynctask.h index 39c15d634..0c1e9115a 100644 --- a/lib/base/asynctask.h +++ b/lib/base/asynctask.h @@ -86,7 +86,6 @@ public: */ TResult GetResult(void) { - boost::mutex::scoped_lock lock(m_Mutex); if (!m_Finished) BOOST_THROW_EXCEPTION(runtime_error("GetResult called on an unfinished AsyncTask")); @@ -110,7 +109,6 @@ public: */ void FinishException(const boost::exception_ptr& ex) { - boost::mutex::scoped_lock lock(m_Mutex); m_Exception = ex; FinishInternal(); } @@ -122,7 +120,6 @@ public: */ void FinishResult(const TResult& result) { - boost::mutex::scoped_lock lock(m_Mutex); m_Result = result; FinishInternal(); } @@ -149,24 +146,24 @@ private: /** * Finishes the task and causes the completion callback to be invoked. This * function must be called before the object is destroyed. - * - * @threadsafety Caller must hold m_Mutex. */ void FinishInternal(void) { - assert(!m_Finished); + CompletionCallback callback; - m_Finished = true; + { + boost::mutex::scoped_lock lock(m_Mutex); + assert(!m_Finished); - m_CV.notify_all(); + m_Finished = true; - if (!m_CompletionCallback.empty()) { - Utility::QueueAsyncCallback(boost::bind(m_CompletionCallback, GetSelf())); + m_CV.notify_all(); - /* Clear callback because the bound function might hold a - * reference to this task. */ - m_CompletionCallback = CompletionCallback(); + m_CompletionCallback.swap(callback); } + + if (!callback.empty()) + Utility::QueueAsyncCallback(boost::bind(callback, GetSelf())); } mutable boost::mutex m_Mutex; diff --git a/lib/base/dynamicobject.cpp b/lib/base/dynamicobject.cpp index 51e8f07c6..81dc93bee 100644 --- a/lib/base/dynamicobject.cpp +++ b/lib/base/dynamicobject.cpp @@ -30,6 +30,7 @@ Timer::Ptr DynamicObject::m_TransactionTimer; signals2::signal DynamicObject::OnRegistered; signals2::signal DynamicObject::OnUnregistered; signals2::signal&)> DynamicObject::OnTransactionClosing; +signals2::signal DynamicObject::OnFlushObject; DynamicObject::DynamicObject(const Dictionary::Ptr& serializedObject) : m_Events(false), m_ConfigTx(0) @@ -527,11 +528,20 @@ double DynamicObject::GetCurrentTx(void) { boost::mutex::scoped_lock lock(m_TransactionMutex); - assert(m_CurrentTx != 0); + if (m_CurrentTx == 0) { + /* Set the initial transaction ID. */ + m_CurrentTx = Utility::GetTime(); + } return m_CurrentTx; } +void DynamicObject::Flush(void) +{ + SendLocalUpdateEvents(); + OnFlushObject(GetSelf()); +} + /* * @threadsafety Always. Caller must not hold any Object locks. */ diff --git a/lib/base/dynamicobject.h b/lib/base/dynamicobject.h index e78d7cfad..ea18ff98e 100644 --- a/lib/base/dynamicobject.h +++ b/lib/base/dynamicobject.h @@ -99,6 +99,7 @@ public: static signals2::signal OnRegistered; static signals2::signal OnUnregistered; static signals2::signal&)> OnTransactionClosing; + static signals2::signal OnFlushObject; ScriptTask::Ptr MakeMethodTask(const String& method, const vector& arguments); @@ -115,6 +116,8 @@ public: void SetTx(double tx); double GetTx(void) const; + void Flush(void); + void Register(void); void Unregister(void); @@ -132,7 +135,6 @@ public: static void DeactivateObjects(void); static double GetCurrentTx(void); - static void NewTx(void); protected: virtual void OnInitCompleted(void); @@ -151,6 +153,8 @@ private: static double m_CurrentTx; + static void NewTx(void); + /* This has to be a set of raw pointers because the DynamicObject * constructor has to be able to insert objects into this list. */ static set m_ModifiedObjects; diff --git a/lib/base/dynamictype.cpp b/lib/base/dynamictype.cpp index 144e9069f..df76b4a87 100644 --- a/lib/base/dynamictype.cpp +++ b/lib/base/dynamictype.cpp @@ -83,11 +83,20 @@ void DynamicType::RegisterObject(const DynamicObject::Ptr& object) ObjectLock olock(object); object->SetEvents(true); - if (m_ObjectMap.find(object->GetName()) != m_ObjectMap.end()) + ObjectMap::iterator it = m_ObjectMap.find(object->GetName()); + + if (it != m_ObjectMap.end()) { + if (it->second == object) + return; + BOOST_THROW_EXCEPTION(runtime_error("RegisterObject() found existing object with the same name: " + object->GetName())); + } m_ObjectMap[object->GetName()] = object; m_ObjectSet.insert(object); + + /* notify the object that it's been fully initialized */ + object->OnInitCompleted(); } void DynamicType::UnregisterObject(const DynamicObject::Ptr& object) @@ -128,22 +137,19 @@ void DynamicType::RegisterType(const DynamicType::Ptr& type) DynamicObject::Ptr DynamicType::CreateObject(const Dictionary::Ptr& serializedUpdate) const { - DynamicObject::Ptr obj = m_ObjectFactory(serializedUpdate); - ObjectLock olock(obj); + DynamicObject::Ptr object = m_ObjectFactory(serializedUpdate); + ObjectLock olock(object); /* register attributes */ String name; DynamicAttributeType type; BOOST_FOREACH(tuples::tie(name, type), m_Attributes) - obj->RegisterAttribute(name, type); + object->RegisterAttribute(name, type); /* apply the object's non-config attributes */ - obj->ApplyUpdate(serializedUpdate, Attribute_All & ~Attribute_Config); - - /* notify the object that it's been fully initialized */ - obj->OnInitCompleted(); + object->ApplyUpdate(serializedUpdate, Attribute_All & ~Attribute_Config); - return obj; + return object; } /** diff --git a/lib/base/eventqueue.cpp b/lib/base/eventqueue.cpp index 7fe42a886..6eb47bab5 100644 --- a/lib/base/eventqueue.cpp +++ b/lib/base/eventqueue.cpp @@ -32,7 +32,7 @@ EventQueue::EventQueue(void) if (thread_count < 4) thread_count = 4; - thread_count *= 8; + thread_count *= 4; for (int i = 0; i < thread_count; i++) m_Threads.create_thread(boost::bind(&EventQueue::QueueThreadProc, this)); @@ -74,7 +74,7 @@ void EventQueue::Join(void) */ void EventQueue::QueueThreadProc(void) { - while (!m_Stopped) { + for (;;) { vector events; { @@ -83,6 +83,9 @@ void EventQueue::QueueThreadProc(void) while (m_Events.empty() && !m_Stopped) m_CV.wait(lock); + if (m_Stopped) + break; + events.swap(m_Events); } @@ -112,5 +115,5 @@ void EventQueue::Post(const EventQueue::Callback& callback) { boost::mutex::scoped_lock lock(m_Mutex); m_Events.push_back(callback); - m_CV.notify_all(); + m_CV.notify_one(); } diff --git a/lib/base/object.cpp b/lib/base/object.cpp index 51e055609..c409d87a0 100644 --- a/lib/base/object.cpp +++ b/lib/base/object.cpp @@ -41,7 +41,6 @@ Object::~Object(void) */ Object::SharedPtrHolder Object::GetSelf(void) { - ObjectLock olock(this); return Object::SharedPtrHolder(shared_from_this()); } diff --git a/lib/base/process-unix.cpp b/lib/base/process-unix.cpp index 2331e37c9..9e852a765 100644 --- a/lib/base/process-unix.cpp +++ b/lib/base/process-unix.cpp @@ -133,7 +133,8 @@ void Process::WorkerThreadProc(int taskFd) if (fd >= 0) tasks[fd] = task; } catch (...) { - Application::GetEQ().Post(boost::bind(&Process::FinishException, task, boost::current_exception())); + ObjectLock olock(task); + task->FinishException(boost::current_exception()); } } @@ -148,7 +149,8 @@ void Process::WorkerThreadProc(int taskFd) prev = it; tasks.erase(prev); - Application::GetEQ().Post(boost::bind(&Process::FinishResult, task, task->m_Result)); + ObjectLock olock(task); + task->FinishResult(task->m_Result); } } } diff --git a/lib/base/process.cpp b/lib/base/process.cpp index d959c0102..58e8b2736 100644 --- a/lib/base/process.cpp +++ b/lib/base/process.cpp @@ -28,7 +28,10 @@ deque Process::m_Tasks; Process::Process(const vector& arguments, const Dictionary::Ptr& extraEnvironment) : AsyncTask(), m_Arguments(arguments), m_ExtraEnvironment(extraEnvironment) { - boost::call_once(&Process::Initialize, m_ThreadOnce); + { + boost::mutex::scoped_lock lock(m_Mutex); + boost::call_once(&Process::Initialize, m_ThreadOnce); + } #ifndef _WIN32 m_FD = -1; diff --git a/lib/base/streamlogger.cpp b/lib/base/streamlogger.cpp index 1196ee1ed..80ee402df 100644 --- a/lib/base/streamlogger.cpp +++ b/lib/base/streamlogger.cpp @@ -83,6 +83,8 @@ void StreamLogger::ProcessLogEntry(ostream& stream, bool tty, const LogEntry& en strftime(timestamp, sizeof(timestamp), "%Y/%m/%d %H:%M:%S %z", &tmnow); + boost::mutex::scoped_lock lock(m_Mutex); + if (tty) { switch (entry.Severity) { case LogWarning: @@ -96,7 +98,6 @@ void StreamLogger::ProcessLogEntry(ostream& stream, bool tty, const LogEntry& en } } - boost::mutex::scoped_lock lock(m_Mutex); stream << "[" << timestamp << "] " << Logger::SeverityToString(entry.Severity) << "/" << entry.Facility << ": " << entry.Message; diff --git a/lib/base/timer.cpp b/lib/base/timer.cpp index b3cd64384..9d3098560 100644 --- a/lib/base/timer.cpp +++ b/lib/base/timer.cpp @@ -22,9 +22,10 @@ using namespace icinga; Timer::TimerSet Timer::m_Timers; +thread Timer::m_Thread; boost::mutex Timer::m_Mutex; boost::condition_variable Timer::m_CV; -boost::once_flag Timer::m_ThreadOnce = BOOST_ONCE_INIT; +bool Timer::m_StopThread; /** * Extracts the next timestamp from a Timer. @@ -59,8 +60,25 @@ Timer::Timer(void) */ void Timer::Initialize(void) { - thread worker(boost::bind(&Timer::TimerThreadProc)); - worker.detach(); + boost::mutex::scoped_lock lock(m_Mutex); + m_StopThread = false; + m_Thread = thread(boost::bind(&Timer::TimerThreadProc)); +} + +/** + * Disables the timer sub-system. + * + * @threadsafety Always. + */ +void Timer::Uninitialize(void) +{ + { + boost::mutex::scoped_lock lock(m_Mutex); + m_StopThread = true; + m_CV.notify_all(); + } + + m_Thread.join(); } /** @@ -70,18 +88,8 @@ void Timer::Initialize(void) */ void Timer::Call(void) { - double st = Utility::GetTime(); - OnTimerExpired(GetSelf()); - double et = Utility::GetTime(); - - if (et - st > 1.0) { - stringstream msgbuf; - msgbuf << "Timer call took " << et - st << " seconds."; - Logger::Write(LogWarning, "base", msgbuf.str()); - } - /* Re-enable the timer so it can be called again. */ m_Started = true; Reschedule(); @@ -118,8 +126,6 @@ double Timer::GetInterval(void) const */ void Timer::Start(void) { - boost::call_once(&Timer::Initialize, m_ThreadOnce); - m_Started = true; Reschedule(); @@ -232,9 +238,12 @@ void Timer::TimerThreadProc(void) NextTimerView& idx = boost::get<1>(m_Timers); /* Wait until there is at least one timer. */ - while (idx.empty()) + while (idx.empty() && !m_StopThread) m_CV.wait(lock); + if (m_StopThread) + break; + NextTimerView::iterator it = idx.begin(); Timer::Ptr timer = it->lock(); diff --git a/lib/base/timer.h b/lib/base/timer.h index a2126b9fa..e06ad26a1 100644 --- a/lib/base/timer.h +++ b/lib/base/timer.h @@ -64,6 +64,9 @@ public: signals2::signal OnTimerExpired; + static void Initialize(void); + static void Uninitialize(void); + private: double m_Interval; /**< The interval of the timer. */ double m_Next; /**< When the next event should happen. */ @@ -79,13 +82,12 @@ private: static boost::mutex m_Mutex; static boost::condition_variable m_CV; + static thread m_Thread; + static bool m_StopThread; static TimerSet m_Timers; void Call(void); - static boost::once_flag m_ThreadOnce; - static void Initialize(void); - static void TimerThreadProc(void); friend struct TimerNextExtractor; diff --git a/lib/config/configitem.cpp b/lib/config/configitem.cpp index 784f63ec7..8a50fd9be 100644 --- a/lib/config/configitem.cpp +++ b/lib/config/configitem.cpp @@ -383,18 +383,14 @@ void ConfigItem::UnloadUnit(const String& unit) vector obsoleteItems; - { - boost::mutex::scoped_lock lock(m_Mutex); - - ConfigItem::Ptr item; - BOOST_FOREACH(tie(tuples::ignore, item), m_Items) { - ObjectLock olock(item); + ConfigItem::Ptr item; + BOOST_FOREACH(tie(tuples::ignore, item), m_Items) { + ObjectLock olock(item); - if (item->GetUnit() != unit) - continue; + if (item->GetUnit() != unit) + continue; - obsoleteItems.push_back(item); - } + obsoleteItems.push_back(item); } BOOST_FOREACH(const ConfigItem::Ptr& item, obsoleteItems) { diff --git a/lib/icinga/externalcommandprocessor.cpp b/lib/icinga/externalcommandprocessor.cpp index 337d36ea6..fd22a52f4 100644 --- a/lib/icinga/externalcommandprocessor.cpp +++ b/lib/icinga/externalcommandprocessor.cpp @@ -349,12 +349,12 @@ void ExternalCommandProcessor::AcknowledgeHostProblem(double, const vectorIsUp()) - BOOST_THROW_EXCEPTION(invalid_argument("The host '" + arguments[0] + "' is OK.")); - Logger::Write(LogInformation, "icinga", "Setting acknowledgement for host '" + host->GetName() + "'"); Service::Ptr service = host->GetHostCheckService(); if (service) { + if (service->GetState() == StateOK) + BOOST_THROW_EXCEPTION(invalid_argument("The host '" + arguments[0] + "' is OK.")); + service->SetAcknowledgement(sticky ? AcknowledgementSticky : AcknowledgementNormal); service->SetAcknowledgementExpiry(0); } @@ -370,12 +370,12 @@ void ExternalCommandProcessor::AcknowledgeHostProblemExpire(double, const vector Host::Ptr host = Host::GetByName(arguments[0]); - if (host->IsUp()) - BOOST_THROW_EXCEPTION(invalid_argument("The host '" + arguments[0] + "' is OK.")); - Logger::Write(LogInformation, "icinga", "Setting timed acknowledgement for host '" + host->GetName() + "'"); Service::Ptr service = host->GetHostCheckService(); if (service) { + if (service->GetState() == StateOK) + BOOST_THROW_EXCEPTION(invalid_argument("The host '" + arguments[0] + "' is OK.")); + service->SetAcknowledgement(sticky ? AcknowledgementSticky : AcknowledgementNormal); service->SetAcknowledgementExpiry(timestamp); } diff --git a/lib/icinga/host.cpp b/lib/icinga/host.cpp index bd22f4297..5c5e0bf0d 100644 --- a/lib/icinga/host.cpp +++ b/lib/icinga/host.cpp @@ -34,7 +34,14 @@ REGISTER_TYPE(Host, hostAttributes); Host::Host(const Dictionary::Ptr& properties) : DynamicObject(properties) -{ } +{ + HostGroup::InvalidateMembersCache(); +} + +void Host::OnInitCompleted(void) +{ + UpdateSlaveServices(); +} Host::~Host(void) { @@ -105,9 +112,18 @@ String Host::GetHostCheck(void) const return Get("hostcheck"); } -bool Host::IsReachable(void) +bool Host::IsReachable(const Host::Ptr& self) { - BOOST_FOREACH(const Service::Ptr& service, GetParentServices()) { + set parentServices; + + { + ObjectLock olock(self); + parentServices = self->GetParentServices(); + } + + BOOST_FOREACH(const Service::Ptr& service, parentServices) { + ObjectLock olock(service); + /* ignore pending services */ if (!service->GetLastCheckResult()) continue; @@ -124,37 +140,29 @@ bool Host::IsReachable(void) return false; } - BOOST_FOREACH(const Host::Ptr& host, GetParentHosts()) { - /* ignore hosts that are up */ - if (host->IsUp()) - continue; + set parentHosts; - return false; + { + ObjectLock olock(self); + parentHosts = self->GetParentHosts(); } - return true; -} - -bool Host::IsInDowntime(void) const -{ - Service::Ptr service = GetHostCheckService(); - - if (!service) - return false; + BOOST_FOREACH(const Host::Ptr& host, parentHosts) { + Service::Ptr hc; - ObjectLock olock(service); - return (service || service->IsInDowntime()); -} + { + ObjectLock olock(host); + hc = host->GetHostCheckService(); + } -bool Host::IsUp(void) const -{ - Service::Ptr service = GetHostCheckService(); + /* ignore hosts that are up */ + if (hc && hc->GetState() == StateOK) + continue; - if (!service) - return true; + return false; + } - ObjectLock olock(service); - return (service->GetState() == StateOK || service->GetState() == StateWarning); + return true; } template @@ -280,10 +288,19 @@ void Host::OnAttributeChanged(const String& name, const Value&) { if (name == "hostgroups") HostGroup::InvalidateMembersCache(); - else if (name == "services") + else if (name == "services") { + ObjectLock olock(this); UpdateSlaveServices(); - else if (name == "notifications") { - BOOST_FOREACH(const Service::Ptr& service, GetServices()) { + } else if (name == "notifications") { + set services; + + { + ObjectLock olock(this); + services = GetServices(); + } + + BOOST_FOREACH(const Service::Ptr& service, services) { + ObjectLock olock(service); service->UpdateSlaveNotifications(); } } diff --git a/lib/icinga/host.h b/lib/icinga/host.h index 7b4d18139..936da7dac 100644 --- a/lib/icinga/host.h +++ b/lib/icinga/host.h @@ -56,9 +56,7 @@ public: set GetParentHosts(void) const; set > GetParentServices(void) const; - bool IsReachable(void); - bool IsInDowntime(void) const; - bool IsUp(void) const; + static bool IsReachable(const Host::Ptr& self); shared_ptr GetServiceByShortName(const Value& name) const; @@ -69,6 +67,7 @@ public: const std::vector& arguments); protected: + void OnInitCompleted(void); void OnAttributeChanged(const String& name, const Value& oldValue); private: diff --git a/lib/icinga/notification.cpp b/lib/icinga/notification.cpp index 1c080b604..5d90a6d35 100644 --- a/lib/icinga/notification.cpp +++ b/lib/icinga/notification.cpp @@ -95,7 +95,10 @@ void Notification::NotificationCompletedHandler(const ScriptTask::Ptr& task) m_Tasks.erase(task); try { - (void) task->GetResult(); + { + ObjectLock tlock(task); + (void) task->GetResult(); + } Logger::Write(LogInformation, "icinga", "Completed sending notification for service '" + GetService()->GetName() + "'"); } catch (const exception& ex) { diff --git a/lib/icinga/pluginnotificationtask.cpp b/lib/icinga/pluginnotificationtask.cpp index 4832ec500..c66671180 100644 --- a/lib/icinga/pluginnotificationtask.cpp +++ b/lib/icinga/pluginnotificationtask.cpp @@ -97,7 +97,10 @@ void PluginNotificationTask::ProcessFinishedHandler(PluginNotificationTask ct) ProcessResult pr; try { - pr = ct.m_Process->GetResult(); + { + ObjectLock tlock(ct.m_Process); + pr = ct.m_Process->GetResult(); + } if (pr.ExitStatus != 0) { stringstream msgbuf; diff --git a/lib/icinga/service-check.cpp b/lib/icinga/service-check.cpp index 0fcab976c..579d6b92b 100644 --- a/lib/icinga/service-check.cpp +++ b/lib/icinga/service-check.cpp @@ -350,9 +350,9 @@ void Service::ApplyCheckResult(const Dictionary::Ptr& cr) /* Make sure the notification component sees the updated * state/state_type attributes. */ - DynamicObject::NewTx(); + Flush(); - if (IsReachable() && !IsInDowntime() && !IsAcknowledged()) + if (IsReachable(GetSelf()) && !IsInDowntime() && !IsAcknowledged()) RequestNotifications(NotificationStateChange); } } @@ -457,7 +457,12 @@ void Service::CheckCompletedHandler(const Dictionary::Ptr& scheduleInfo, Dictionary::Ptr result; try { - Value vresult = task->GetResult(); + Value vresult; + + { + ObjectLock tlock(task); + vresult = task->GetResult(); + } if (vresult.IsObjectType()) result = vresult; @@ -510,9 +515,9 @@ void Service::ProcessCheckResult(const Dictionary::Ptr& cr) Service::UpdateStatistics(cr); - /* flush the current transaction so other instances see the service's + /* Flush the object so other instances see the service's * new state when they receive the CheckResult message */ - DynamicObject::NewTx(); + Flush(); RequestMessage rm; rm.SetMethod("checker::CheckResult"); diff --git a/lib/icinga/service-notification.cpp b/lib/icinga/service-notification.cpp index 7e684e4ff..21ba7a737 100644 --- a/lib/icinga/service-notification.cpp +++ b/lib/icinga/service-notification.cpp @@ -125,7 +125,17 @@ void Service::UpdateSlaveNotifications(void) newNotifications = boost::make_shared(); vector notificationDescsList; - notificationDescsList.push_back(GetHost()->Get("notifications")); + + String host_name; + + { + Host::Ptr host = GetHost(); + ObjectLock olock(host); + + notificationDescsList.push_back(host->Get("notifications")); + host_name = host->GetName(); + } + notificationDescsList.push_back(Get("notifications")); BOOST_FOREACH(const Dictionary::Ptr& notificationDescs, notificationDescsList) { @@ -145,7 +155,7 @@ void Service::UpdateSlaveNotifications(void) ConfigItemBuilder::Ptr builder = boost::make_shared(item->GetDebugInfo()); builder->SetType("Notification"); builder->SetName(name); - builder->AddExpression("host_name", OperatorSet, GetHost()->GetName()); + builder->AddExpression("host_name", OperatorSet, host_name); builder->AddExpression("service", OperatorSet, GetName()); CopyNotificationAttributes(this, builder); diff --git a/lib/icinga/service.cpp b/lib/icinga/service.cpp index 5eb15257d..e2f1f68a7 100644 --- a/lib/icinga/service.cpp +++ b/lib/icinga/service.cpp @@ -47,7 +47,17 @@ REGISTER_TYPE(Service, serviceAttributes); Service::Service(const Dictionary::Ptr& serializedObject) : DynamicObject(serializedObject) -{ } +{ + ServiceGroup::InvalidateMembersCache(); + Host::InvalidateServicesCache(); + Service::InvalidateDowntimesCache(); + Service::InvalidateCommentsCache(); +} + +void Service::OnInitCompleted(void) +{ + UpdateSlaveNotifications(); +} Service::~Service(void) { @@ -142,9 +152,18 @@ String Service::GetShortName(void) const return value; } -bool Service::IsReachable(void) const +bool Service::IsReachable(const Service::Ptr& self) { - BOOST_FOREACH(const Service::Ptr& service, GetParentServices()) { + set parentServices; + + { + ObjectLock olock(self); + parentServices = self->GetParentServices(); + } + + BOOST_FOREACH(const Service::Ptr& service, parentServices) { + ObjectLock olock(service); + /* ignore pending services */ if (!service->GetLastCheckResult()) continue; @@ -161,9 +180,23 @@ bool Service::IsReachable(void) const return false; } - BOOST_FOREACH(const Host::Ptr& host, GetParentHosts()) { + set parentHosts; + + { + ObjectLock olock(self); + parentHosts = self->GetParentHosts(); + } + + BOOST_FOREACH(const Host::Ptr& host, parentHosts) { + Service::Ptr hc; + + { + ObjectLock olock(host); + hc = host->GetHostCheckService(); + } + /* ignore hosts that are up */ - if (host->IsUp()) + if (hc && hc->GetState() == StateOK) continue; return false; @@ -222,8 +255,6 @@ void Service::SetAcknowledgementExpiry(double timestamp) void Service::OnAttributeChanged(const String& name, const Value& oldValue) { - ObjectLock olock(this); - if (name == "checker") OnCheckerChanged(GetSelf(), oldValue); else if (name == "next_check") @@ -232,7 +263,11 @@ void Service::OnAttributeChanged(const String& name, const Value& oldValue) ServiceGroup::InvalidateMembersCache(); else if (name == "host_name" || name == "short_name") { Host::InvalidateServicesCache(); - UpdateSlaveNotifications(); + + { + ObjectLock olock(this); + UpdateSlaveNotifications(); + } } else if (name == "downtimes") Service::InvalidateDowntimesCache(); else if (name == "comments") @@ -240,6 +275,7 @@ void Service::OnAttributeChanged(const String& name, const Value& oldValue) else if (name == "notifications") UpdateSlaveNotifications(); else if (name == "check_interval") { + ObjectLock(this); ConfigItem::Ptr item = ConfigItem::GetObject("Service", GetName()); /* update the next check timestamp if we're the owner of this service */ diff --git a/lib/icinga/service.h b/lib/icinga/service.h index 2c3d9e99d..8a44fc198 100644 --- a/lib/icinga/service.h +++ b/lib/icinga/service.h @@ -111,7 +111,7 @@ public: set GetParentHosts(void) const; set GetParentServices(void) const; - bool IsReachable(void) const; + bool IsReachable(const Service::Ptr& self); AcknowledgementType GetAcknowledgement(void); void SetAcknowledgement(AcknowledgementType acknowledgement); @@ -248,6 +248,7 @@ public: void SetNextNotification(double time); protected: + virtual void OnInitCompleted(void); virtual void OnAttributeChanged(const String& name, const Value& oldValue); private: