From: Gunnar Beutner Date: Sat, 6 Jan 2018 03:36:58 +0000 (+0100) Subject: Avoid mutex contention in the config parser X-Git-Tag: v2.9.0~237^2 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=d9010c7b9faaec137f3e195b370edbb406c37d76;p=icinga2 Avoid mutex contention in the config parser --- diff --git a/lib/base/workqueue.cpp b/lib/base/workqueue.cpp index a18c9818b..e55eab2ad 100644 --- a/lib/base/workqueue.cpp +++ b/lib/base/workqueue.cpp @@ -60,25 +60,17 @@ String WorkQueue::GetName() const return m_Name; } +boost::mutex::scoped_lock WorkQueue::AcquireLock() +{ + return boost::mutex::scoped_lock(m_Mutex); +} + /** * Enqueues a task. Tasks are guaranteed to be executed in the order - * they were enqueued in except if there is more than one worker thread or when - * allowInterleaved is true in which case the new task might be run - * immediately if it's being enqueued from within the WorkQueue thread. + * they were enqueued in except if there is more than one worker thread. */ -void WorkQueue::Enqueue(std::function&& function, WorkQueuePriority priority, - bool allowInterleaved) +void WorkQueue::EnqueueUnlocked(boost::mutex::scoped_lock& lock, std::function&& function, WorkQueuePriority priority) { - bool wq_thread = IsWorkerThread(); - - if (wq_thread && allowInterleaved) { - function(); - - return; - } - - boost::mutex::scoped_lock lock(m_Mutex); - if (!m_Spawned) { Log(LogNotice, "WorkQueue") << "Spawning WorkQueue threads for '" << m_Name << "'"; @@ -90,6 +82,8 @@ void WorkQueue::Enqueue(std::function&& function, WorkQueuePriority pri m_Spawned = true; } + bool wq_thread = IsWorkerThread(); + if (!wq_thread) { while (m_Tasks.size() >= m_MaxItems && m_MaxItems != 0) m_CVFull.wait(lock); @@ -100,6 +94,27 @@ void WorkQueue::Enqueue(std::function&& function, WorkQueuePriority pri m_CVEmpty.notify_one(); } +/** + * Enqueues a task. Tasks are guaranteed to be executed in the order + * they were enqueued in except if there is more than one worker thread or when + * allowInterleaved is true in which case the new task might be run + * immediately if it's being enqueued from within the WorkQueue thread. + */ +void WorkQueue::Enqueue(std::function&& function, WorkQueuePriority priority, + bool allowInterleaved) +{ + bool wq_thread = IsWorkerThread(); + + if (wq_thread && allowInterleaved) { + function(); + + return; + } + + auto lock = AcquireLock(); + EnqueueUnlocked(lock, std::move(function), priority); +} + /** * Waits until all currently enqueued tasks have completed. This only works reliably * when no other thread is enqueuing new tasks when this method is called. @@ -231,6 +246,25 @@ void WorkQueue::StatusTimerHandler() } } +void WorkQueue::RunTaskFunction(const TaskFunction& func) +{ + try { + func(); + } catch (const std::exception&) { + boost::exception_ptr eptr = boost::current_exception(); + + { + boost::mutex::scoped_lock mutex(m_Mutex); + + if (!m_ExceptionCallback) + m_Exceptions.push_back(eptr); + } + + if (m_ExceptionCallback) + m_ExceptionCallback(eptr); + } +} + void WorkQueue::WorkerThreadProc() { std::ostringstream idbuf; @@ -258,19 +292,7 @@ void WorkQueue::WorkerThreadProc() lock.unlock(); - try { - task.Function(); - } catch (const std::exception&) { - lock.lock(); - - if (!m_ExceptionCallback) - m_Exceptions.push_back(boost::current_exception()); - - lock.unlock(); - - if (m_ExceptionCallback) - m_ExceptionCallback(boost::current_exception()); - } + RunTaskFunction(task.Function); /* clear the task so whatever other resources it holds are released _before_ we re-acquire the mutex */ task = Task(); diff --git a/lib/base/workqueue.hpp b/lib/base/workqueue.hpp index 14d0dd5fb..cdb6c437b 100644 --- a/lib/base/workqueue.hpp +++ b/lib/base/workqueue.hpp @@ -41,15 +41,17 @@ enum WorkQueuePriority PriorityHigh }; +using TaskFunction = std::function; + struct Task { Task() = default; - Task(std::function function, WorkQueuePriority priority, int id) + Task(TaskFunction function, WorkQueuePriority priority, int id) : Function(std::move(function)), Priority(priority), ID(id) { } - std::function Function; + TaskFunction Function; WorkQueuePriority Priority{PriorityNormal}; int ID{-1}; }; @@ -72,10 +74,42 @@ public: void SetName(const String& name); String GetName() const; - void Enqueue(std::function&& function, WorkQueuePriority priority = PriorityNormal, + boost::mutex::scoped_lock AcquireLock(); + void EnqueueUnlocked(boost::mutex::scoped_lock& lock, TaskFunction&& function, WorkQueuePriority priority = PriorityNormal); + void Enqueue(TaskFunction&& function, WorkQueuePriority priority = PriorityNormal, bool allowInterleaved = false); void Join(bool stop = false); + template + void ParallelFor(const VectorType& items, const FuncType& func) + { + using SizeType = decltype(items.size()); + + SizeType totalCount = items.size(); + + auto lock = AcquireLock(); + + SizeType offset = 0; + + for (int i = 0; i < m_ThreadCount; i++) { + SizeType count = totalCount / static_cast(m_ThreadCount); + if (static_cast(i) < totalCount % static_cast(m_ThreadCount)) + count++; + + EnqueueUnlocked(lock, [&items, func, offset, count, this]() { + for (SizeType j = offset; j < offset + count; j++) { + RunTaskFunction([&func, &items, j]() { + func(items[j]); + }); + } + }); + + offset += count; + } + + ASSERT(offset == items.size()); + } + bool IsWorkerThread() const; size_t GetLength() const; @@ -119,6 +153,8 @@ private: void WorkerThreadProc(); void StatusTimerHandler(); + + void RunTaskFunction(const TaskFunction& func); }; } diff --git a/lib/config/configitem.cpp b/lib/config/configitem.cpp index 87037e92f..e1b9da781 100644 --- a/lib/config/configitem.cpp +++ b/lib/config/configitem.cpp @@ -428,12 +428,12 @@ bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue if (items.empty()) return true; - for (const ItemPair& ip : items) { + for (const auto& ip : items) newItems.push_back(ip.first); - upq.Enqueue([&]() { - ip.first->Commit(ip.second); - }); - } + + upq.ParallelFor(items, [](const ItemPair& ip) { + ip.first->Commit(ip.second); + }); upq.Join(); @@ -468,36 +468,29 @@ bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue if (unresolved_dep) continue; - for (const ItemPair& ip : items) { + upq.ParallelFor(items, [&type](const ItemPair& ip) { const ConfigItem::Ptr& item = ip.first; - if (!item->m_Object) - continue; + if (!item->m_Object || item->m_Type != type) + return; - if (item->m_Type == type) { - upq.Enqueue([&]() { - try { - item->m_Object->OnAllConfigLoaded(); - } catch (const std::exception& ex) { - if (item->m_IgnoreOnError) { - Log(LogNotice, "ConfigObject") - << "Ignoring config object '" << item->m_Name << "' of type '" << item->m_Type->GetName() << "' due to errors: " << DiagnosticInformation(ex); - - item->Unregister(); + try { + item->m_Object->OnAllConfigLoaded(); + } catch (const std::exception& ex) { + if (!item->m_IgnoreOnError) + throw; - { - boost::mutex::scoped_lock lock(item->m_Mutex); - item->m_IgnoredItems.push_back(item->m_DebugInfo.Path); - } + Log(LogNotice, "ConfigObject") + << "Ignoring config object '" << item->m_Name << "' of type '" << item->m_Type->GetName() << "' due to errors: " << DiagnosticInformation(ex); - return; - } + item->Unregister(); - throw; - } - }); + { + boost::mutex::scoped_lock lock(item->m_Mutex); + item->m_IgnoredItems.push_back(item->m_DebugInfo.Path); + } } - } + }); completed_types.insert(type); @@ -507,19 +500,15 @@ bool ConfigItem::CommitNewItems(const ActivationContext::Ptr& context, WorkQueue return false; for (const String& loadDep : type->GetLoadDependencies()) { - for (const ItemPair& ip : items) { + upq.ParallelFor(items, [loadDep, &type](const ItemPair& ip) { const ConfigItem::Ptr& item = ip.first; - if (!item->m_Object) - continue; + if (!item->m_Object || item->m_Type->GetName() != loadDep) + return; - if (item->m_Type->GetName() == loadDep) { - upq.Enqueue([&]() { - ActivationScope ascope(item->m_ActivationContext); - item->m_Object->CreateChildObjects(type); - }); - } - } + ActivationScope ascope(item->m_ActivationContext); + item->m_Object->CreateChildObjects(type); + }); } upq.Join(); @@ -606,14 +595,8 @@ bool ConfigItem::ActivateItems(WorkQueue& upq, const std::vectorGetName() << "' of type '" << object->GetReflectionType()->GetName() << "'"; #endif /* I2_DEBUG */ - upq.Enqueue(std::bind(&ConfigObject::PreActivate, object)); - } - - upq.Join(); - if (upq.HasExceptions()) { - upq.ReportExceptions("ConfigItem"); - return false; + object->PreActivate(); } if (!silent) @@ -629,7 +612,8 @@ bool ConfigItem::ActivateItems(WorkQueue& upq, const std::vectorGetName() << "' of type '" << object->GetReflectionType()->GetName() << "'"; #endif /* I2_DEBUG */ - upq.Enqueue(std::bind(&ConfigObject::Activate, object, runtimeCreated)); + + object->Activate(runtimeCreated); } upq.Join();