From 1daeb8c010f6aa2eaf1acc8f8ef97febbaf8a1c4 Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Tue, 19 Feb 2013 12:17:31 +0100 Subject: [PATCH] Fine-grained locks (WIP, Part 5). --- .../replication/replicationcomponent.cpp | 11 ++-- components/replication/replicationcomponent.h | 2 +- lib/base/asynctask.h | 16 +++--- lib/base/dynamicobject.cpp | 53 +++++++++++++------ lib/base/dynamicobject.h | 4 +- lib/base/eventqueue.cpp | 10 ++-- lib/base/script.cpp | 2 + lib/base/streamlogger.cpp | 3 ++ lib/base/streamlogger.h | 1 + lib/icinga/host.cpp | 12 ++++- lib/icinga/service.cpp | 2 + 11 files changed, 81 insertions(+), 35 deletions(-) diff --git a/components/replication/replicationcomponent.cpp b/components/replication/replicationcomponent.cpp index e8a65767b..3fb30b7ca 100644 --- a/components/replication/replicationcomponent.cpp +++ b/components/replication/replicationcomponent.cpp @@ -160,7 +160,7 @@ void ReplicationComponent::LocalObjectUnregisteredHandler(const DynamicObject::P MakeObjectMessage(object, "config::ObjectRemoved", 0, false)); } -void ReplicationComponent::TransactionClosingHandler(const set& modifiedObjects) +void ReplicationComponent::TransactionClosingHandler(const set& modifiedObjects) { if (modifiedObjects.empty()) return; @@ -169,11 +169,14 @@ void ReplicationComponent::TransactionClosingHandler(const set& msgbuf << "Sending " << modifiedObjects.size() << " replication updates."; Logger::Write(LogDebug, "replication", msgbuf.str()); - BOOST_FOREACH(DynamicObject *robject, modifiedObjects) { - DynamicObject::Ptr object = robject->GetSelf(); + BOOST_FOREACH(const DynamicObject::WeakPtr& wobject, modifiedObjects) { + DynamicObject::Ptr object = wobject.lock(); + + if (!object) + continue; if (!ShouldReplicateObject(object)) - continue; + continue; RequestMessage request = MakeObjectMessage(object, "config::ObjectUpdate", DynamicObject::GetCurrentTx(), true); EndpointManager::GetInstance()->SendMulticastMessage(m_Endpoint, request); diff --git a/components/replication/replicationcomponent.h b/components/replication/replicationcomponent.h index de965ab32..4843836f4 100644 --- a/components/replication/replicationcomponent.h +++ b/components/replication/replicationcomponent.h @@ -41,7 +41,7 @@ private: void LocalObjectRegisteredHandler(const DynamicObject::Ptr& object); void LocalObjectUnregisteredHandler(const DynamicObject::Ptr& object); - void TransactionClosingHandler(const set& modifiedObjects); + void TransactionClosingHandler(const set& modifiedObjects); void RemoteObjectUpdateHandler(const RequestMessage& request); void RemoteObjectRemovedHandler(const RequestMessage& request); diff --git a/lib/base/asynctask.h b/lib/base/asynctask.h index 6fb942973..39c15d634 100644 --- a/lib/base/asynctask.h +++ b/lib/base/asynctask.h @@ -86,6 +86,7 @@ public: */ TResult GetResult(void) { + boost::mutex::scoped_lock lock(m_Mutex); if (!m_Finished) BOOST_THROW_EXCEPTION(runtime_error("GetResult called on an unfinished AsyncTask")); @@ -109,6 +110,7 @@ public: */ void FinishException(const boost::exception_ptr& ex) { + boost::mutex::scoped_lock lock(m_Mutex); m_Exception = ex; FinishInternal(); } @@ -120,6 +122,7 @@ public: */ void FinishResult(const TResult& result) { + boost::mutex::scoped_lock lock(m_Mutex); m_Result = result; FinishInternal(); } @@ -146,20 +149,19 @@ private: /** * Finishes the task and causes the completion callback to be invoked. This * function must be called before the object is destroyed. + * + * @threadsafety Caller must hold m_Mutex. */ void FinishInternal(void) { - { - boost::mutex::scoped_lock lock(m_Mutex); - assert(!m_Finished); + assert(!m_Finished); - m_Finished = true; + m_Finished = true; - m_CV.notify_all(); - } + m_CV.notify_all(); if (!m_CompletionCallback.empty()) { - m_CompletionCallback(GetSelf()); + Utility::QueueAsyncCallback(boost::bind(m_CompletionCallback, GetSelf())); /* Clear callback because the bound function might hold a * reference to this task. */ diff --git a/lib/base/dynamicobject.cpp b/lib/base/dynamicobject.cpp index c4683e1be..51e8f07c6 100644 --- a/lib/base/dynamicobject.cpp +++ b/lib/base/dynamicobject.cpp @@ -22,14 +22,14 @@ using namespace icinga; double DynamicObject::m_CurrentTx = 0; -set DynamicObject::m_ModifiedObjects; +set DynamicObject::m_ModifiedObjects; boost::mutex DynamicObject::m_TransactionMutex; boost::once_flag DynamicObject::m_TransactionOnce; Timer::Ptr DynamicObject::m_TransactionTimer; signals2::signal DynamicObject::OnRegistered; signals2::signal DynamicObject::OnUnregistered; -signals2::signal&)> DynamicObject::OnTransactionClosing; +signals2::signal&)> DynamicObject::OnTransactionClosing; DynamicObject::DynamicObject(const Dictionary::Ptr& serializedObject) : m_Events(false), m_ConfigTx(0) @@ -63,10 +63,7 @@ DynamicObject::DynamicObject(const Dictionary::Ptr& serializedObject) * @threadsafety Always. */ DynamicObject::~DynamicObject(void) -{ - boost::mutex::scoped_lock lock(m_TransactionMutex); - m_ModifiedObjects.erase(this); -} +{ } void DynamicObject::Initialize(void) { @@ -82,15 +79,19 @@ void DynamicObject::Initialize(void) */ void DynamicObject::SendLocalUpdateEvents(void) { + map attrs; + + { + ObjectLock olock(this); + attrs.swap(m_ModifiedAttributes); + } + /* Check if it's safe to send events. */ if (GetEvents()) { map::iterator it; - for (it = m_ModifiedAttributes.begin(); it != m_ModifiedAttributes.end(); it++) { + for (it = attrs.begin(); it != attrs.end(); it++) OnAttributeChanged(it->first, it->second); - } } - - m_ModifiedAttributes.clear(); } Dictionary::Ptr DynamicObject::BuildUpdate(double sinceTx, int attributeTypes) const @@ -235,9 +236,19 @@ void DynamicObject::InternalSetAttribute(const String& name, const Value& data, if (tt.first->second.Type & Attribute_Config) m_ConfigTx = tx; - { + DynamicObject::Ptr self; + try { + self = GetSelf(); + } catch (const boost::bad_weak_ptr& ex) { + /* We're being called from the constructor. Ignore + * the exception. The OnInitCompleted() function + * will take care of adding this object to the list + * of modified objects. */ + } + + if (self) { boost::mutex::scoped_lock lock(m_TransactionMutex); - m_ModifiedObjects.insert(this); + m_ModifiedObjects.insert(self); } /* Use insert() rather than [] so we don't overwrite @@ -527,7 +538,7 @@ double DynamicObject::GetCurrentTx(void) void DynamicObject::NewTx(void) { double tx; - set objects; + set objects; { boost::mutex::scoped_lock lock(m_TransactionMutex); @@ -537,8 +548,12 @@ void DynamicObject::NewTx(void) m_CurrentTx = Utility::GetTime(); } - BOOST_FOREACH(DynamicObject *object, objects) { - ObjectLock olock(object); + BOOST_FOREACH(const DynamicObject::WeakPtr& wobject, objects) { + DynamicObject::Ptr object = wobject.lock(); + + if (!object) + continue; + object->SendLocalUpdateEvents(); } @@ -546,7 +561,13 @@ void DynamicObject::NewTx(void) } void DynamicObject::OnInitCompleted(void) -{ } +{ + /* Add this new object to the list of modified objects. + * We're doing this here because we can't construct + * a while WeakPtr from within the object's constructor. */ + boost::mutex::scoped_lock lock(m_TransactionMutex); + m_ModifiedObjects.insert(GetSelf()); +} void DynamicObject::OnAttributeChanged(const String&, const Value&) { } diff --git a/lib/base/dynamicobject.h b/lib/base/dynamicobject.h index 8e1b5565d..e78d7cfad 100644 --- a/lib/base/dynamicobject.h +++ b/lib/base/dynamicobject.h @@ -98,7 +98,7 @@ public: static signals2::signal OnRegistered; static signals2::signal OnUnregistered; - static signals2::signal&)> OnTransactionClosing; + static signals2::signal&)> OnTransactionClosing; ScriptTask::Ptr MakeMethodTask(const String& method, const vector& arguments); @@ -153,7 +153,7 @@ private: /* This has to be a set of raw pointers because the DynamicObject * constructor has to be able to insert objects into this list. */ - static set m_ModifiedObjects; + static set m_ModifiedObjects; static boost::mutex m_TransactionMutex; static boost::once_flag m_TransactionOnce; static Timer::Ptr m_TransactionTimer; diff --git a/lib/base/eventqueue.cpp b/lib/base/eventqueue.cpp index 30e75791d..7fe42a886 100644 --- a/lib/base/eventqueue.cpp +++ b/lib/base/eventqueue.cpp @@ -27,12 +27,14 @@ using namespace icinga; EventQueue::EventQueue(void) : m_Stopped(false) { - int cpus = thread::hardware_concurrency(); + int thread_count = thread::hardware_concurrency(); - if (cpus < 4) - cpus = 4; + if (thread_count < 4) + thread_count = 4; - for (int i = 0; i < cpus; i++) + thread_count *= 8; + + for (int i = 0; i < thread_count; i++) m_Threads.create_thread(boost::bind(&EventQueue::QueueThreadProc, this)); } diff --git a/lib/base/script.cpp b/lib/base/script.cpp index 0bcc5ca77..e83dc9b37 100644 --- a/lib/base/script.cpp +++ b/lib/base/script.cpp @@ -34,6 +34,8 @@ Script::Script(const Dictionary::Ptr& properties) void Script::OnInitCompleted(void) { + DynamicObject::OnInitCompleted(); + SpawnInterpreter(); } diff --git a/lib/base/streamlogger.cpp b/lib/base/streamlogger.cpp index 6cc447a99..1196ee1ed 100644 --- a/lib/base/streamlogger.cpp +++ b/lib/base/streamlogger.cpp @@ -21,6 +21,8 @@ using namespace icinga; +boost::mutex StreamLogger::m_Mutex; + /** * Constructor for the StreamLogger class. */ @@ -94,6 +96,7 @@ void StreamLogger::ProcessLogEntry(ostream& stream, bool tty, const LogEntry& en } } + boost::mutex::scoped_lock lock(m_Mutex); stream << "[" << timestamp << "] " << Logger::SeverityToString(entry.Severity) << "/" << entry.Facility << ": " << entry.Message; diff --git a/lib/base/streamlogger.h b/lib/base/streamlogger.h index c1719a6e1..3d5c8fc16 100644 --- a/lib/base/streamlogger.h +++ b/lib/base/streamlogger.h @@ -47,6 +47,7 @@ protected: virtual void ProcessLogEntry(const LogEntry& entry); private: + static boost::mutex m_Mutex; ostream *m_Stream; bool m_OwnsStream; bool m_Tty; diff --git a/lib/icinga/host.cpp b/lib/icinga/host.cpp index 554d22da3..bd22f4297 100644 --- a/lib/icinga/host.cpp +++ b/lib/icinga/host.cpp @@ -138,13 +138,23 @@ bool Host::IsReachable(void) bool Host::IsInDowntime(void) const { Service::Ptr service = GetHostCheckService(); + + if (!service) + return false; + + ObjectLock olock(service); return (service || service->IsInDowntime()); } bool Host::IsUp(void) const { Service::Ptr service = GetHostCheckService(); - return (!service || service->GetState() == StateOK || service->GetState() == StateWarning); + + if (!service) + return true; + + ObjectLock olock(service); + return (service->GetState() == StateOK || service->GetState() == StateWarning); } template diff --git a/lib/icinga/service.cpp b/lib/icinga/service.cpp index 6bf21848e..5eb15257d 100644 --- a/lib/icinga/service.cpp +++ b/lib/icinga/service.cpp @@ -222,6 +222,8 @@ void Service::SetAcknowledgementExpiry(double timestamp) void Service::OnAttributeChanged(const String& name, const Value& oldValue) { + ObjectLock olock(this); + if (name == "checker") OnCheckerChanged(GetSelf(), oldValue); else if (name == "next_check") -- 2.40.0