From 5151f6567eefb28da3b749d103c1b3a0ab9ac78f Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Mon, 1 Apr 2019 17:05:16 +0200 Subject: [PATCH] ThreadPool: use the Boost ASIO thread pool under the hood --- lib/base/threadpool.cpp | 367 ++-------------------------------------- lib/base/threadpool.hpp | 120 +++++-------- 2 files changed, 54 insertions(+), 433 deletions(-) diff --git a/lib/base/threadpool.cpp b/lib/base/threadpool.cpp index 0021bb84b..d7819ef4a 100644 --- a/lib/base/threadpool.cpp +++ b/lib/base/threadpool.cpp @@ -1,23 +1,13 @@ /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ #include "base/threadpool.hpp" -#include "base/logger.hpp" -#include "base/debug.hpp" -#include "base/utility.hpp" -#include "base/exception.hpp" -#include "base/application.hpp" -#include +#include using namespace icinga; -int ThreadPool::m_NextID = 1; - -ThreadPool::ThreadPool(size_t max_threads) - : m_ID(m_NextID++), m_MaxThreads(max_threads) +ThreadPool::ThreadPool(size_t threads) + : m_Threads(threads) { - if (m_MaxThreads != UINT_MAX && m_MaxThreads < sizeof(m_Queues) / sizeof(m_Queues[0])) - m_MaxThreads = sizeof(m_Queues) / sizeof(m_Queues[0]); - Start(); } @@ -28,354 +18,19 @@ ThreadPool::~ThreadPool() void ThreadPool::Start() { - if (!m_Stopped) - return; - - m_Stopped = false; - - for (auto& queue : m_Queues) - queue.SpawnWorker(m_ThreadGroup); + boost::unique_lock lock (m_Mutex); - m_MgmtThread = std::thread(std::bind(&ThreadPool::ManagerThreadProc, this)); -} - -void ThreadPool::Stop() -{ - if (m_Stopped) - return; - - { - boost::mutex::scoped_lock lock(m_MgmtMutex); - m_Stopped = true; - m_MgmtCV.notify_all(); - } - - if (m_MgmtThread.joinable()) - m_MgmtThread.join(); - - for (auto& queue : m_Queues) { - boost::mutex::scoped_lock lock(queue.Mutex); - queue.Stopped = true; - queue.CV.notify_all(); + if (!m_Pool) { + m_Pool = decltype(m_Pool)(new boost::asio::thread_pool(m_Threads)); } - - m_ThreadGroup.join_all(); - m_ThreadGroup.~thread_group(); - new (&m_ThreadGroup) boost::thread_group(); - - for (auto& queue : m_Queues) - queue.Stopped = false; - - m_Stopped = true; } -/** - * Waits for work items and processes them. - */ -void ThreadPool::WorkerThread::ThreadProc(Queue& queue) -{ - std::ostringstream idbuf; - idbuf << "Q #" << &queue << " W #" << this; - Utility::SetThreadName(idbuf.str()); - - for (;;) { - WorkItem wi; - - { - boost::mutex::scoped_lock lock(queue.Mutex); - - UpdateUtilization(ThreadIdle); - - while (queue.Items.empty() && !queue.Stopped && !Zombie) { - if (queue.Items.empty()) - queue.CVStarved.notify_all(); - - queue.CV.wait(lock); - } - - if (Zombie) - break; - - if (queue.Items.empty() && queue.Stopped) - break; - - wi = queue.Items.front(); - queue.Items.pop_front(); - - UpdateUtilization(ThreadBusy); - } - - double st = Utility::GetTime(); - -#ifdef I2_DEBUG -# ifdef RUSAGE_THREAD - struct rusage usage_start, usage_end; - - (void) getrusage(RUSAGE_THREAD, &usage_start); -# endif /* RUSAGE_THREAD */ -#endif /* I2_DEBUG */ - - try { - if (wi.Callback) - wi.Callback(); - } catch (const std::exception& ex) { - Log(LogCritical, "ThreadPool") - << "Exception thrown in event handler:\n" - << DiagnosticInformation(ex); - } catch (...) { - Log(LogCritical, "ThreadPool", "Exception of unknown type thrown in event handler."); - } - - double et = Utility::GetTime(); - double latency = st - wi.Timestamp; - - { - boost::mutex::scoped_lock lock(queue.Mutex); - - queue.WaitTime += latency; - queue.ServiceTime += et - st; - queue.TaskCount++; - } - -#ifdef I2_DEBUG -# ifdef RUSAGE_THREAD - (void) getrusage(RUSAGE_THREAD, &usage_end); - - double duser = (usage_end.ru_utime.tv_sec - usage_start.ru_utime.tv_sec) + - (usage_end.ru_utime.tv_usec - usage_start.ru_utime.tv_usec) / 1000000.0; - - double dsys = (usage_end.ru_stime.tv_sec - usage_start.ru_stime.tv_sec) + - (usage_end.ru_stime.tv_usec - usage_start.ru_stime.tv_usec) / 1000000.0; - - double dwait = (et - st) - (duser + dsys); - - int dminfaults = usage_end.ru_minflt - usage_start.ru_minflt; - int dmajfaults = usage_end.ru_majflt - usage_start.ru_majflt; - - int dvctx = usage_end.ru_nvcsw - usage_start.ru_nvcsw; - int divctx = usage_end.ru_nivcsw - usage_start.ru_nivcsw; -# endif /* RUSAGE_THREAD */ - if (et - st > 0.5) { - Log(LogWarning, "ThreadPool") -# ifdef RUSAGE_THREAD - << "Event call took user:" << duser << "s, system:" << dsys << "s, wait:" << dwait << "s, minor_faults:" << dminfaults << ", major_faults:" << dmajfaults << ", voluntary_csw:" << dvctx << ", involuntary_csw:" << divctx; -# else - << "Event call took " << (et - st) << "s"; -# endif /* RUSAGE_THREAD */ - } -#endif /* I2_DEBUG */ - } - - boost::mutex::scoped_lock lock(queue.Mutex); - UpdateUtilization(ThreadDead); - Zombie = false; -} - -/** - * Appends a work item to the work queue. Work items will be processed in FIFO order. - * - * @param callback The callback function for the work item. - * @param policy The scheduling policy - * @returns true if the item was queued, false otherwise. - */ -bool ThreadPool::Post(const ThreadPool::WorkFunction& callback, SchedulerPolicy policy) -{ - WorkItem wi; - wi.Callback = callback; - wi.Timestamp = Utility::GetTime(); - - Queue& queue = m_Queues[Utility::Random() % (sizeof(m_Queues) / sizeof(m_Queues[0]))]; - - { - boost::mutex::scoped_lock lock(queue.Mutex); - - if (queue.Stopped) - return false; - - if (policy == LowLatencyScheduler) - queue.SpawnWorker(m_ThreadGroup); - - queue.Items.emplace_back(std::move(wi)); - queue.CV.notify_one(); - } - - return true; -} - -void ThreadPool::ManagerThreadProc() -{ - std::ostringstream idbuf; - idbuf << "TP #" << m_ID << " Manager"; - Utility::SetThreadName(idbuf.str()); - - double lastStats = 0; - - for (;;) { - size_t total_pending = 0, total_alive = 0; - double total_avg_latency = 0; - double total_utilization = 0; - - { - boost::mutex::scoped_lock lock(m_MgmtMutex); - - if (!m_Stopped) - m_MgmtCV.timed_wait(lock, boost::posix_time::milliseconds(500)); - - if (m_Stopped) - break; - } - - for (auto& queue : m_Queues) { - size_t pending, alive = 0; - double avg_latency; - double utilization = 0; - - boost::mutex::scoped_lock lock(queue.Mutex); - - for (auto& thread : queue.Threads) - thread.UpdateUtilization(); - - pending = queue.Items.size(); - - for (auto& thread : queue.Threads) { - if (thread.State != ThreadDead && !thread.Zombie) { - alive++; - utilization += thread.Utilization * 100; - } - } - - utilization /= alive; - - if (queue.TaskCount > 0) - avg_latency = queue.WaitTime / (queue.TaskCount * 1.0); - else - avg_latency = 0; - - if (utilization < 60 || utilization > 80 || alive < 8) { - double wthreads = std::ceil((utilization * alive) / 80.0); - - int tthreads = wthreads - alive; - - /* Make sure there is at least one thread per queue */ - if (alive + tthreads < 1) - tthreads = 1 - alive; - - /* Don't kill more than 2 threads at once. */ - if (tthreads < -2) - tthreads = -2; - - /* Spawn more workers if there are outstanding work items. */ - if (tthreads > 0 && pending > 0) - tthreads = 2; - - if (m_MaxThreads != UINT_MAX && (alive + tthreads) * (sizeof(m_Queues) / sizeof(m_Queues[0])) > m_MaxThreads) - tthreads = m_MaxThreads / (sizeof(m_Queues) / sizeof(m_Queues[0])) - alive; - - if (tthreads != 0) { - Log(LogNotice, "ThreadPool") - << "Thread pool; current: " << alive << "; adjustment: " << tthreads; - } - - for (int i = 0; i < -tthreads; i++) - queue.KillWorker(m_ThreadGroup); - - for (int i = 0; i < tthreads; i++) - queue.SpawnWorker(m_ThreadGroup); - } - - queue.WaitTime = 0; - queue.ServiceTime = 0; - queue.TaskCount = 0; - - total_pending += pending; - total_alive += alive; - total_avg_latency += avg_latency; - total_utilization += utilization; - } - - double now = Utility::GetTime(); - - if (lastStats < now - 15) { - lastStats = now; - - Log(LogNotice, "ThreadPool") - << "Pool #" << m_ID << ": Pending tasks: " << total_pending << "; Average latency: " - << (long)(total_avg_latency * 1000 / (sizeof(m_Queues) / sizeof(m_Queues[0]))) << "ms" - << "; Threads: " << total_alive - << "; Pool utilization: " << (total_utilization / (sizeof(m_Queues) / sizeof(m_Queues[0]))) << "%"; - } - } -} - -/** - * Note: Caller must hold m_Mutex - */ -void ThreadPool::Queue::SpawnWorker(boost::thread_group& group) -{ - for (auto& thread : Threads) { - if (thread.State == ThreadDead) { - Log(LogDebug, "ThreadPool", "Spawning worker thread."); - - thread = WorkerThread(ThreadIdle); - thread.Thread = group.create_thread(std::bind(&ThreadPool::WorkerThread::ThreadProc, std::ref(thread), std::ref(*this))); - - break; - } - } -} - -/** - * Note: Caller must hold Mutex. - */ -void ThreadPool::Queue::KillWorker(boost::thread_group& group) -{ - for (auto& thread : Threads) { - if (thread.State == ThreadIdle && !thread.Zombie) { - Log(LogDebug, "ThreadPool", "Killing worker thread."); - - group.remove_thread(thread.Thread); - thread.Thread->detach(); - delete thread.Thread; - - thread.Zombie = true; - CV.notify_all(); - - break; - } - } -} - -/** - * Note: Caller must hold queue Mutex. - */ -void ThreadPool::WorkerThread::UpdateUtilization(ThreadState state) +void ThreadPool::Stop() { - double utilization; + boost::unique_lock lock (m_Mutex); - switch (State) { - case ThreadDead: - return; - case ThreadIdle: - utilization = 0; - break; - case ThreadBusy: - utilization = 1; - break; - default: - VERIFY(0); + if (m_Pool) { + m_Pool->join(); + m_Pool = nullptr; } - - double now = Utility::GetTime(); - double time = now - LastUpdate; - - const double avg_time = 5.0; - - if (time > avg_time) - time = avg_time; - - Utilization = (Utilization * (avg_time - time) + utilization * time) / avg_time; - LastUpdate = now; - - if (state != ThreadUnspecified) - State = state; } diff --git a/lib/base/threadpool.hpp b/lib/base/threadpool.hpp index 742e891c5..3a803f295 100644 --- a/lib/base/threadpool.hpp +++ b/lib/base/threadpool.hpp @@ -3,18 +3,21 @@ #ifndef THREADPOOL_H #define THREADPOOL_H -#include "base/i2-base.hpp" -#include -#include -#include -#include +#include "base/exception.hpp" +#include "base/logger.hpp" +#include +#include +#include +#include #include +#include +#include +#include +#include namespace icinga { -#define QUEUECOUNT 4U - enum SchedulerPolicy { DefaultScheduler, @@ -31,83 +34,46 @@ class ThreadPool public: typedef std::function WorkFunction; - ThreadPool(size_t max_threads = UINT_MAX); + ThreadPool(size_t threads = std::thread::hardware_concurrency() * 2u); ~ThreadPool(); void Start(); void Stop(); - bool Post(const WorkFunction& callback, SchedulerPolicy policy = DefaultScheduler); - -private: - enum ThreadState - { - ThreadUnspecified, - ThreadDead, - ThreadIdle, - ThreadBusy - }; - - struct WorkItem - { - WorkFunction Callback; - double Timestamp; - }; - - struct Queue; - - struct WorkerThread - { - ThreadState State{ThreadDead}; - bool Zombie{false}; - double Utilization{0}; - double LastUpdate{0}; - boost::thread *Thread{nullptr}; - - WorkerThread(ThreadState state = ThreadDead) - : State(state) - { } - - void UpdateUtilization(ThreadState state = ThreadUnspecified); - - void ThreadProc(Queue& queue); - }; - - struct Queue + /** + * Appends a work item to the work queue. Work items will be processed in FIFO order. + * + * @param callback The callback function for the work item. + * @returns true if the item was queued, false otherwise. + */ + template + bool Post(T callback, SchedulerPolicy) { - boost::mutex Mutex; - boost::condition_variable CV; - boost::condition_variable CVStarved; - - std::deque Items; - - double WaitTime{0}; - double ServiceTime{0}; - int TaskCount{0}; - - bool Stopped{false}; - - WorkerThread Threads[16]; - - void SpawnWorker(boost::thread_group& group); - void KillWorker(boost::thread_group& group); - }; + boost::shared_lock lock (m_Mutex); + + if (m_Pool) { + boost::asio::post(*m_Pool, [callback]() { + try { + callback(); + } catch (const std::exception& ex) { + Log(LogCritical, "ThreadPool") + << "Exception thrown in event handler:\n" + << DiagnosticInformation(ex); + } catch (...) { + Log(LogCritical, "ThreadPool", "Exception of unknown type thrown in event handler."); + } + }); + + return true; + } else { + return false; + } + } - int m_ID; - static int m_NextID; - - size_t m_MaxThreads; - - boost::thread_group m_ThreadGroup; - - std::thread m_MgmtThread; - boost::mutex m_MgmtMutex; - boost::condition_variable m_MgmtCV; - bool m_Stopped{true}; - - Queue m_Queues[QUEUECOUNT]; - - void ManagerThreadProc(); +private: + boost::shared_mutex m_Mutex; + std::unique_ptr m_Pool; + size_t m_Threads; }; } -- 2.40.0