]> granicus.if.org Git - icinga2/commitdiff
Avoid mutex contention in the config parser 5955/head
authorGunnar Beutner <gunnar.beutner@icinga.com>
Sat, 6 Jan 2018 03:36:58 +0000 (04:36 +0100)
committerGunnar Beutner <gunnar.beutner@icinga.com>
Mon, 8 Jan 2018 14:59:55 +0000 (15:59 +0100)
lib/base/workqueue.cpp
lib/base/workqueue.hpp
lib/config/configitem.cpp

index a18c9818bd6775f5a575a5c22eab5ca518dc0b13..e55eab2ad62e1f3422416dff457b7f26f25502d0 100644 (file)
@@ -60,25 +60,17 @@ String WorkQueue::GetName() const
        return m_Name;
 }
 
+boost::mutex::scoped_lock WorkQueue::AcquireLock()
+{
+       return boost::mutex::scoped_lock(m_Mutex);
+}
+
 /**
  * Enqueues a task. Tasks are guaranteed to be executed in the order
- * they were enqueued in except if there is more than one worker thread or when
- * allowInterleaved is true in which case the new task might be run
- * immediately if it's being enqueued from within the WorkQueue thread.
+ * they were enqueued in except if there is more than one worker thread.
  */
-void WorkQueue::Enqueue(std::function<void ()>&& function, WorkQueuePriority priority,
-       bool allowInterleaved)
+void WorkQueue::EnqueueUnlocked(boost::mutex::scoped_lock& lock, std::function<void ()>&& function, WorkQueuePriority priority)
 {
-       bool wq_thread = IsWorkerThread();
-
-       if (wq_thread && allowInterleaved) {
-               function();
-
-               return;
-       }
-
-       boost::mutex::scoped_lock lock(m_Mutex);
-
        if (!m_Spawned) {
                Log(LogNotice, "WorkQueue")
                        << "Spawning WorkQueue threads for '" << m_Name << "'";
@@ -90,6 +82,8 @@ void WorkQueue::Enqueue(std::function<void ()>&& function, WorkQueuePriority pri
                m_Spawned = true;
        }
 
+       bool wq_thread = IsWorkerThread();
+
        if (!wq_thread) {
                while (m_Tasks.size() >= m_MaxItems && m_MaxItems != 0)
                        m_CVFull.wait(lock);
@@ -100,6 +94,27 @@ void WorkQueue::Enqueue(std::function<void ()>&& function, WorkQueuePriority pri
        m_CVEmpty.notify_one();
 }
 
+/**
+ * Enqueues a task. Tasks are guaranteed to be executed in the order
+ * they were enqueued in except if there is more than one worker thread or when
+ * allowInterleaved is true in which case the new task might be run
+ * immediately if it's being enqueued from within the WorkQueue thread.
+ */
+void WorkQueue::Enqueue(std::function<void ()>&& function, WorkQueuePriority priority,
+       bool allowInterleaved)
+{
+       bool wq_thread = IsWorkerThread();
+
+       if (wq_thread && allowInterleaved) {
+               function();
+
+               return;
+       }
+
+       auto lock = AcquireLock();
+       EnqueueUnlocked(lock, std::move(function), priority);
+}
+
 /**
  * Waits until all currently enqueued tasks have completed. This only works reliably
  * when no other thread is enqueuing new tasks when this method is called.
@@ -231,6 +246,25 @@ void WorkQueue::StatusTimerHandler()
        }
 }
 
+void WorkQueue::RunTaskFunction(const TaskFunction& func)
+{
+       try {
+               func();
+       } catch (const std::exception&) {
+               boost::exception_ptr eptr = boost::current_exception();
+
+               {
+                       boost::mutex::scoped_lock mutex(m_Mutex);
+
+                       if (!m_ExceptionCallback)
+                               m_Exceptions.push_back(eptr);
+               }
+
+               if (m_ExceptionCallback)
+                       m_ExceptionCallback(eptr);
+       }
+}
+
 void WorkQueue::WorkerThreadProc()
 {
        std::ostringstream idbuf;
@@ -258,19 +292,7 @@ void WorkQueue::WorkerThreadProc()
 
                lock.unlock();
 
-               try {
-                       task.Function();
-               } catch (const std::exception&) {
-                       lock.lock();
-
-                       if (!m_ExceptionCallback)
-                               m_Exceptions.push_back(boost::current_exception());
-
-                       lock.unlock();
-
-                       if (m_ExceptionCallback)
-                               m_ExceptionCallback(boost::current_exception());
-               }
+               RunTaskFunction(task.Function);
 
                /* clear the task so whatever other resources it holds are released _before_ we re-acquire the mutex */
                task = Task();
index 14d0dd5fb2b506a1c6eb30e3b5d7e96abf47c082..cdb6c437ba419a99839f9bfebe188b3db52f1216 100644 (file)
@@ -41,15 +41,17 @@ enum WorkQueuePriority
        PriorityHigh
 };
 
+using TaskFunction = std::function<void ()>;
+
 struct Task
 {
        Task() = default;
 
-       Task(std::function<void (void)> function, WorkQueuePriority priority, int id)
+       Task(TaskFunction function, WorkQueuePriority priority, int id)
                : Function(std::move(function)), Priority(priority), ID(id)
        { }
 
-       std::function<void (void)> Function;
+       TaskFunction Function;
        WorkQueuePriority Priority{PriorityNormal};
        int ID{-1};
 };
@@ -72,10 +74,42 @@ public:
        void SetName(const String& name);
        String GetName() const;
 
-       void Enqueue(std::function<void (void)>&& function, WorkQueuePriority priority = PriorityNormal,
+       boost::mutex::scoped_lock AcquireLock();
+       void EnqueueUnlocked(boost::mutex::scoped_lock& lock, TaskFunction&& function, WorkQueuePriority priority = PriorityNormal);
+       void Enqueue(TaskFunction&& function, WorkQueuePriority priority = PriorityNormal,
                bool allowInterleaved = false);
        void Join(bool stop = false);
 
+       template<typename VectorType, typename FuncType>
+       void ParallelFor(const VectorType& items, const FuncType& func)
+       {
+               using SizeType = decltype(items.size());
+
+               SizeType totalCount = items.size();
+
+               auto lock = AcquireLock();
+
+               SizeType offset = 0;
+
+               for (int i = 0; i < m_ThreadCount; i++) {
+                       SizeType count = totalCount / static_cast<SizeType>(m_ThreadCount);
+                       if (static_cast<SizeType>(i) < totalCount % static_cast<SizeType>(m_ThreadCount))
+                               count++;
+
+                       EnqueueUnlocked(lock, [&items, func, offset, count, this]() {
+                               for (SizeType j = offset; j < offset + count; j++) {
+                                       RunTaskFunction([&func, &items, j]() {
+                                               func(items[j]);
+                                       });
+                               }
+                       });
+
+                       offset += count;
+               }
+
+               ASSERT(offset == items.size());
+       }
+
        bool IsWorkerThread() const;
 
        size_t GetLength() const;
@@ -119,6 +153,8 @@ private:
 
        void WorkerThreadProc();
        void StatusTimerHandler();
+
+       void RunTaskFunction(const TaskFunction& func);
 };
 
 }
index 87037e92fb202e3bc18e27d612c4466fe92536ed..e1b9da781a72731e31180779dc38292540d79811 100644 (file)
@@ -428,12 +428,12 @@ bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue
        if (items.empty())
                return true;
 
-       for (const ItemPair& ip : items) {
+       for (const auto& ip : items)
                newItems.push_back(ip.first);
-               upq.Enqueue([&]() {
-                       ip.first->Commit(ip.second);
-               });
-       }
+
+       upq.ParallelFor(items, [](const ItemPair& ip) {
+               ip.first->Commit(ip.second);
+       });
 
        upq.Join();
 
@@ -468,36 +468,29 @@ bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue
                        if (unresolved_dep)
                                continue;
 
-                       for (const ItemPair& ip : items) {
+                       upq.ParallelFor(items, [&type](const ItemPair& ip) {
                                const ConfigItem::Ptr& item = ip.first;
 
-                               if (!item->m_Object)
-                                       continue;
+                               if (!item->m_Object || item->m_Type != type)
+                                       return;
 
-                               if (item->m_Type == type) {
-                                       upq.Enqueue([&]() {
-                                               try {
-                                                       item->m_Object->OnAllConfigLoaded();
-                                               } catch (const std::exception& ex) {
-                                                       if (item->m_IgnoreOnError) {
-                                                               Log(LogNotice, "ConfigObject")
-                                                                       << "Ignoring config object '" << item->m_Name << "' of type '" << item->m_Type->GetName() << "' due to errors: " << DiagnosticInformation(ex);
-
-                                                               item->Unregister();
+                               try {
+                                       item->m_Object->OnAllConfigLoaded();
+                               } catch (const std::exception& ex) {
+                                       if (!item->m_IgnoreOnError)
+                                               throw;
 
-                                                               {
-                                                                       boost::mutex::scoped_lock lock(item->m_Mutex);
-                                                                       item->m_IgnoredItems.push_back(item->m_DebugInfo.Path);
-                                                               }
+                                       Log(LogNotice, "ConfigObject")
+                                               << "Ignoring config object '" << item->m_Name << "' of type '" << item->m_Type->GetName() << "' due to errors: " << DiagnosticInformation(ex);
 
-                                                               return;
-                                                       }
+                                       item->Unregister();
 
-                                                       throw;
-                                               }
-                                       });
+                                       {
+                                               boost::mutex::scoped_lock lock(item->m_Mutex);
+                                               item->m_IgnoredItems.push_back(item->m_DebugInfo.Path);
+                                       }
                                }
-                       }
+                       });
 
                        completed_types.insert(type);
 
@@ -507,19 +500,15 @@ bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue
                                return false;
 
                        for (const String& loadDep : type->GetLoadDependencies()) {
-                               for (const ItemPair& ip : items) {
+                               upq.ParallelFor(items, [loadDep, &type](const ItemPair& ip) {
                                        const ConfigItem::Ptr& item = ip.first;
 
-                                       if (!item->m_Object)
-                                               continue;
+                                       if (!item->m_Object || item->m_Type->GetName() != loadDep)
+                                               return;
 
-                                       if (item->m_Type->GetName() == loadDep) {
-                                               upq.Enqueue([&]() {
-                                                       ActivationScope ascope(item->m_ActivationContext);
-                                                       item->m_Object->CreateChildObjects(type);
-                                               });
-                                       }
-                               }
+                                       ActivationScope ascope(item->m_ActivationContext);
+                                       item->m_Object->CreateChildObjects(type);
+                               });
                        }
 
                        upq.Join();
@@ -606,14 +595,8 @@ bool ConfigItem::ActivateItems(WorkQueue& upq, const std::vector<ConfigItem::Ptr
                Log(LogDebug, "ConfigItem")
                        << "Setting 'active' to true for object '" << object->GetName() << "' of type '" << object->GetReflectionType()->GetName() << "'";
 #endif /* I2_DEBUG */
-               upq.Enqueue(std::bind(&ConfigObject::PreActivate, object));
-       }
-
-       upq.Join();
 
-       if (upq.HasExceptions()) {
-               upq.ReportExceptions("ConfigItem");
-               return false;
+               object->PreActivate();
        }
 
        if (!silent)
@@ -629,7 +612,8 @@ bool ConfigItem::ActivateItems(WorkQueue& upq, const std::vector<ConfigItem::Ptr
                Log(LogDebug, "ConfigItem")
                        << "Activating object '" << object->GetName() << "' of type '" << object->GetReflectionType()->GetName() << "'";
 #endif /* I2_DEBUG */
-               upq.Enqueue(std::bind(&ConfigObject::Activate, object, runtimeCreated));
+
+               object->Activate(runtimeCreated);
        }
 
        upq.Join();