]> granicus.if.org Git - icinga2/commitdiff
Fine-grained locks (WIP, Part 5).
authorGunnar Beutner <gunnar.beutner@netways.de>
Tue, 19 Feb 2013 11:17:31 +0000 (12:17 +0100)
committerGunnar Beutner <gunnar.beutner@netways.de>
Tue, 19 Feb 2013 11:17:31 +0000 (12:17 +0100)
components/replication/replicationcomponent.cpp
components/replication/replicationcomponent.h
lib/base/asynctask.h
lib/base/dynamicobject.cpp
lib/base/dynamicobject.h
lib/base/eventqueue.cpp
lib/base/script.cpp
lib/base/streamlogger.cpp
lib/base/streamlogger.h
lib/icinga/host.cpp
lib/icinga/service.cpp

index e8a65767b85ec2379224ea462195a191afec4791..3fb30b7ca67990ab926968929d8933fd5b129dcf 100644 (file)
@@ -160,7 +160,7 @@ void ReplicationComponent::LocalObjectUnregisteredHandler(const DynamicObject::P
            MakeObjectMessage(object, "config::ObjectRemoved", 0, false));
 }
 
-void ReplicationComponent::TransactionClosingHandler(const set<DynamicObject *>& modifiedObjects)
+void ReplicationComponent::TransactionClosingHandler(const set<DynamicObject::WeakPtr>& modifiedObjects)
 {
        if (modifiedObjects.empty())
                return;
@@ -169,11 +169,14 @@ void ReplicationComponent::TransactionClosingHandler(const set<DynamicObject *>&
        msgbuf << "Sending " << modifiedObjects.size() << " replication updates.";
        Logger::Write(LogDebug, "replication", msgbuf.str());
 
-       BOOST_FOREACH(DynamicObject *robject, modifiedObjects) {
-               DynamicObject::Ptr object = robject->GetSelf();
+       BOOST_FOREACH(const DynamicObject::WeakPtr& wobject, modifiedObjects) {
+               DynamicObject::Ptr object = wobject.lock();
+
+               if (!object)
+                       continue;
 
                if (!ShouldReplicateObject(object))
-                               continue;
+                       continue;
 
                RequestMessage request = MakeObjectMessage(object, "config::ObjectUpdate", DynamicObject::GetCurrentTx(), true);
                EndpointManager::GetInstance()->SendMulticastMessage(m_Endpoint, request);
index de965ab325dd4fd55e69ad701cac01e2d7468df8..4843836f4120f82fb6aabdcc2cda40547495335b 100644 (file)
@@ -41,7 +41,7 @@ private:
 
        void LocalObjectRegisteredHandler(const DynamicObject::Ptr& object);
        void LocalObjectUnregisteredHandler(const DynamicObject::Ptr& object);
-       void TransactionClosingHandler(const set<DynamicObject *>& modifiedObjects);
+       void TransactionClosingHandler(const set<DynamicObject::WeakPtr>& modifiedObjects);
 
        void RemoteObjectUpdateHandler(const RequestMessage& request);
        void RemoteObjectRemovedHandler(const RequestMessage& request);
index 6fb9429733cc5d27b5a000844f8883e21d8c6d67..39c15d634095999c36b1f201e284fe7260c148b6 100644 (file)
@@ -86,6 +86,7 @@ public:
         */
        TResult GetResult(void)
        {
+               boost::mutex::scoped_lock lock(m_Mutex);
                if (!m_Finished)
                        BOOST_THROW_EXCEPTION(runtime_error("GetResult called on an unfinished AsyncTask"));
 
@@ -109,6 +110,7 @@ public:
         */
        void FinishException(const boost::exception_ptr& ex)
        {
+               boost::mutex::scoped_lock lock(m_Mutex);
                m_Exception = ex;
                FinishInternal();
        }
@@ -120,6 +122,7 @@ public:
         */
        void FinishResult(const TResult& result)
        {
+               boost::mutex::scoped_lock lock(m_Mutex);
                m_Result = result;
                FinishInternal();
        }
@@ -146,20 +149,19 @@ private:
        /**
         * Finishes the task and causes the completion callback to be invoked. This
         * function must be called before the object is destroyed.
+        *
+        * @threadsafety Caller must hold m_Mutex.
         */
        void FinishInternal(void)
        {
-               {
-                       boost::mutex::scoped_lock lock(m_Mutex);
-                       assert(!m_Finished);
+               assert(!m_Finished);
 
-                       m_Finished = true;
+               m_Finished = true;
 
-                       m_CV.notify_all();
-               }
+               m_CV.notify_all();
 
                if (!m_CompletionCallback.empty()) {
-                       m_CompletionCallback(GetSelf());
+                       Utility::QueueAsyncCallback(boost::bind(m_CompletionCallback, GetSelf()));
 
                        /* Clear callback because the bound function might hold a
                         * reference to this task. */
index c4683e1beda42e106d4785655976427ad6992685..51e8f07c6d766b4e90e4af17442dfa066eeac315 100644 (file)
 using namespace icinga;
 
 double DynamicObject::m_CurrentTx = 0;
-set<DynamicObject *> DynamicObject::m_ModifiedObjects;
+set<DynamicObject::WeakPtr> DynamicObject::m_ModifiedObjects;
 boost::mutex DynamicObject::m_TransactionMutex;
 boost::once_flag DynamicObject::m_TransactionOnce;
 Timer::Ptr DynamicObject::m_TransactionTimer;
 
 signals2::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnRegistered;
 signals2::signal<void (const DynamicObject::Ptr&)> DynamicObject::OnUnregistered;
-signals2::signal<void (double, const set<DynamicObject *>&)> DynamicObject::OnTransactionClosing;
+signals2::signal<void (double, const set<DynamicObject::WeakPtr>&)> DynamicObject::OnTransactionClosing;
 
 DynamicObject::DynamicObject(const Dictionary::Ptr& serializedObject)
        : m_Events(false), m_ConfigTx(0)
@@ -63,10 +63,7 @@ DynamicObject::DynamicObject(const Dictionary::Ptr& serializedObject)
  * @threadsafety Always.
  */
 DynamicObject::~DynamicObject(void)
-{
-       boost::mutex::scoped_lock lock(m_TransactionMutex);
-       m_ModifiedObjects.erase(this);
-}
+{ }
 
 void DynamicObject::Initialize(void)
 {
@@ -82,15 +79,19 @@ void DynamicObject::Initialize(void)
  */
 void DynamicObject::SendLocalUpdateEvents(void)
 {
+       map<String, Value, string_iless> attrs;
+
+       {
+               ObjectLock olock(this);
+               attrs.swap(m_ModifiedAttributes);
+       }
+
        /* Check if it's safe to send events. */
        if (GetEvents()) {
                map<String, Value, string_iless>::iterator it;
-               for (it = m_ModifiedAttributes.begin(); it != m_ModifiedAttributes.end(); it++) {
+               for (it = attrs.begin(); it != attrs.end(); it++)
                        OnAttributeChanged(it->first, it->second);
-               }
        }
-
-       m_ModifiedAttributes.clear();
 }
 
 Dictionary::Ptr DynamicObject::BuildUpdate(double sinceTx, int attributeTypes) const
@@ -235,9 +236,19 @@ void DynamicObject::InternalSetAttribute(const String& name, const Value& data,
        if (tt.first->second.Type & Attribute_Config)
                m_ConfigTx = tx;
 
-       {
+       DynamicObject::Ptr self;
+       try {
+               self = GetSelf();
+       } catch (const boost::bad_weak_ptr& ex) {
+               /* We're being called from the constructor. Ignore
+                * the exception. The OnInitCompleted() function
+                * will take care of adding this object to the list
+                * of modified objects. */
+       }
+
+       if (self) {
                boost::mutex::scoped_lock lock(m_TransactionMutex);
-               m_ModifiedObjects.insert(this);
+               m_ModifiedObjects.insert(self);
        }
 
        /* Use insert() rather than [] so we don't overwrite
@@ -527,7 +538,7 @@ double DynamicObject::GetCurrentTx(void)
 void DynamicObject::NewTx(void)
 {
        double tx;
-       set<DynamicObject *> objects;
+       set<DynamicObject::WeakPtr> objects;
 
        {
                boost::mutex::scoped_lock lock(m_TransactionMutex);
@@ -537,8 +548,12 @@ void DynamicObject::NewTx(void)
                m_CurrentTx = Utility::GetTime();
        }
 
-       BOOST_FOREACH(DynamicObject *object, objects) {
-               ObjectLock olock(object);
+       BOOST_FOREACH(const DynamicObject::WeakPtr& wobject, objects) {
+               DynamicObject::Ptr object = wobject.lock();
+
+               if (!object)
+                       continue;
+
                object->SendLocalUpdateEvents();
        }
 
@@ -546,7 +561,13 @@ void DynamicObject::NewTx(void)
 }
 
 void DynamicObject::OnInitCompleted(void)
-{ }
+{
+       /* Add this new object to the list of modified objects.
+        * We're doing this here because we can't construct
+        * a while WeakPtr from within the object's constructor. */
+       boost::mutex::scoped_lock lock(m_TransactionMutex);
+       m_ModifiedObjects.insert(GetSelf());
+}
 
 void DynamicObject::OnAttributeChanged(const String&, const Value&)
 { }
index 8e1b5565dd14c777c2e4cec26eec2fb09b331dde..e78d7cfad497128380123b34192feb5ed2473ed4 100644 (file)
@@ -98,7 +98,7 @@ public:
 
        static signals2::signal<void (const DynamicObject::Ptr&)> OnRegistered;
        static signals2::signal<void (const DynamicObject::Ptr&)> OnUnregistered;
-       static signals2::signal<void (double, const set<DynamicObject *>&)> OnTransactionClosing;
+       static signals2::signal<void (double, const set<DynamicObject::WeakPtr>&)> OnTransactionClosing;
 
        ScriptTask::Ptr MakeMethodTask(const String& method,
            const vector<Value>& arguments);
@@ -153,7 +153,7 @@ private:
 
        /* This has to be a set of raw pointers because the DynamicObject
         * constructor has to be able to insert objects into this list. */
-       static set<DynamicObject *> m_ModifiedObjects;
+       static set<DynamicObject::WeakPtr> m_ModifiedObjects;
        static boost::mutex m_TransactionMutex;
        static boost::once_flag m_TransactionOnce;
        static Timer::Ptr m_TransactionTimer;
index 30e75791d887ee61e7e766c779a3cd0f37a6bc59..7fe42a8868893e3866ab959cac9133dc9c2c93f5 100644 (file)
@@ -27,12 +27,14 @@ using namespace icinga;
 EventQueue::EventQueue(void)
        : m_Stopped(false)
 {
-       int cpus = thread::hardware_concurrency();
+       int thread_count = thread::hardware_concurrency();
 
-       if (cpus < 4)
-               cpus = 4;
+       if (thread_count < 4)
+               thread_count = 4;
 
-       for (int i = 0; i < cpus; i++)
+       thread_count *= 8;
+
+       for (int i = 0; i < thread_count; i++)
                m_Threads.create_thread(boost::bind(&EventQueue::QueueThreadProc, this));
 }
 
index 0bcc5ca77380f50c63f74b60e7883d2652a1b067..e83dc9b37c8c71786281cbd8a3c842367fbff0a3 100644 (file)
@@ -34,6 +34,8 @@ Script::Script(const Dictionary::Ptr& properties)
 
 void Script::OnInitCompleted(void)
 {
+       DynamicObject::OnInitCompleted();
+
        SpawnInterpreter();
 }
 
index 6cc447a99171247c784554e04cfcb89669ca88af..1196ee1edd479877bb87a0c3272de1b5b529b573 100644 (file)
@@ -21,6 +21,8 @@
 
 using namespace icinga;
 
+boost::mutex StreamLogger::m_Mutex;
+
 /**
  * Constructor for the StreamLogger class.
  */
@@ -94,6 +96,7 @@ void StreamLogger::ProcessLogEntry(ostream& stream, bool tty, const LogEntry& en
                }
        }
 
+       boost::mutex::scoped_lock lock(m_Mutex);
        stream << "[" << timestamp << "] "
                 << Logger::SeverityToString(entry.Severity) << "/" << entry.Facility << ": "
                 << entry.Message;
index c1719a6e11e8177edb1e1b52d639dbc461a19079..3d5c8fc161e96a7b69c055cff68c3f1610fa45b7 100644 (file)
@@ -47,6 +47,7 @@ protected:
        virtual void ProcessLogEntry(const LogEntry& entry);
 
 private:
+       static boost::mutex m_Mutex;
        ostream *m_Stream;
        bool m_OwnsStream;
         bool m_Tty;
index 554d22da313a4242ea0293d0efe1ef3bf19a0aae..bd22f42971fe501394d5266dfde416867efc060f 100644 (file)
@@ -138,13 +138,23 @@ bool Host::IsReachable(void)
 bool Host::IsInDowntime(void) const
 {
        Service::Ptr service = GetHostCheckService();
+
+       if (!service)
+               return false;
+
+       ObjectLock olock(service);
        return (service || service->IsInDowntime());
 }
 
 bool Host::IsUp(void) const
 {
        Service::Ptr service = GetHostCheckService();
-       return (!service || service->GetState() == StateOK || service->GetState() == StateWarning);
+
+       if (!service)
+               return true;
+
+       ObjectLock olock(service);
+       return (service->GetState() == StateOK || service->GetState() == StateWarning);
 }
 
 template<bool copyServiceAttrs, typename TDict>
index 6bf21848e769b3c261db1e6da9cf02dff99fc0f6..5eb15257dcff08c5aacc408cee00e8386ba0370c 100644 (file)
@@ -222,6 +222,8 @@ void Service::SetAcknowledgementExpiry(double timestamp)
 
 void Service::OnAttributeChanged(const String& name, const Value& oldValue)
 {
+       ObjectLock olock(this);
+
        if (name == "checker")
                OnCheckerChanged(GetSelf(), oldValue);
        else if (name == "next_check")