]> granicus.if.org Git - icinga2/commitdiff
ThreadPool: use the Boost ASIO thread pool under the hood
authorAlexander A. Klimov <alexander.klimov@icinga.com>
Mon, 1 Apr 2019 15:05:16 +0000 (17:05 +0200)
committerMichael Friedrich <michael.friedrich@icinga.com>
Thu, 25 Apr 2019 06:25:28 +0000 (08:25 +0200)
lib/base/threadpool.cpp
lib/base/threadpool.hpp

index 0021bb84b900182ab691127241417fce485b1253..d7819ef4a7ef416f12abeb968baae4523f61c9cd 100644 (file)
@@ -1,23 +1,13 @@
 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
 
 #include "base/threadpool.hpp"
-#include "base/logger.hpp"
-#include "base/debug.hpp"
-#include "base/utility.hpp"
-#include "base/exception.hpp"
-#include "base/application.hpp"
-#include <iostream>
+#include <boost/thread/locks.hpp>
 
 using namespace icinga;
 
-int ThreadPool::m_NextID = 1;
-
-ThreadPool::ThreadPool(size_t max_threads)
-       : m_ID(m_NextID++), m_MaxThreads(max_threads)
+ThreadPool::ThreadPool(size_t threads)
+       : m_Threads(threads)
 {
-       if (m_MaxThreads != UINT_MAX && m_MaxThreads < sizeof(m_Queues) / sizeof(m_Queues[0]))
-               m_MaxThreads = sizeof(m_Queues) / sizeof(m_Queues[0]);
-
        Start();
 }
 
@@ -28,354 +18,19 @@ ThreadPool::~ThreadPool()
 
 void ThreadPool::Start()
 {
-       if (!m_Stopped)
-               return;
-
-       m_Stopped = false;
-
-       for (auto& queue : m_Queues)
-               queue.SpawnWorker(m_ThreadGroup);
+       boost::unique_lock<decltype(m_Mutex)> lock (m_Mutex);
 
-       m_MgmtThread = std::thread(std::bind(&ThreadPool::ManagerThreadProc, this));
-}
-
-void ThreadPool::Stop()
-{
-       if (m_Stopped)
-               return;
-
-       {
-               boost::mutex::scoped_lock lock(m_MgmtMutex);
-               m_Stopped = true;
-               m_MgmtCV.notify_all();
-       }
-
-       if (m_MgmtThread.joinable())
-               m_MgmtThread.join();
-
-       for (auto& queue : m_Queues) {
-               boost::mutex::scoped_lock lock(queue.Mutex);
-               queue.Stopped = true;
-               queue.CV.notify_all();
+       if (!m_Pool) {
+               m_Pool = decltype(m_Pool)(new boost::asio::thread_pool(m_Threads));
        }
-
-       m_ThreadGroup.join_all();
-       m_ThreadGroup.~thread_group();
-       new (&m_ThreadGroup) boost::thread_group();
-
-       for (auto& queue : m_Queues)
-               queue.Stopped = false;
-
-       m_Stopped = true;
 }
 
-/**
- * Waits for work items and processes them.
- */
-void ThreadPool::WorkerThread::ThreadProc(Queue& queue)
-{
-       std::ostringstream idbuf;
-       idbuf << "Q #" << &queue << " W #" << this;
-       Utility::SetThreadName(idbuf.str());
-
-       for (;;) {
-               WorkItem wi;
-
-               {
-                       boost::mutex::scoped_lock lock(queue.Mutex);
-
-                       UpdateUtilization(ThreadIdle);
-
-                       while (queue.Items.empty() && !queue.Stopped && !Zombie) {
-                               if (queue.Items.empty())
-                                       queue.CVStarved.notify_all();
-
-                               queue.CV.wait(lock);
-                       }
-
-                       if (Zombie)
-                               break;
-
-                       if (queue.Items.empty() && queue.Stopped)
-                               break;
-
-                       wi = queue.Items.front();
-                       queue.Items.pop_front();
-
-                       UpdateUtilization(ThreadBusy);
-               }
-
-               double st = Utility::GetTime();
-
-#ifdef I2_DEBUG
-#      ifdef RUSAGE_THREAD
-               struct rusage usage_start, usage_end;
-
-               (void) getrusage(RUSAGE_THREAD, &usage_start);
-#      endif /* RUSAGE_THREAD */
-#endif /* I2_DEBUG */
-
-               try {
-                       if (wi.Callback)
-                               wi.Callback();
-               } catch (const std::exception& ex) {
-                       Log(LogCritical, "ThreadPool")
-                               << "Exception thrown in event handler:\n"
-                               << DiagnosticInformation(ex);
-               } catch (...) {
-                       Log(LogCritical, "ThreadPool", "Exception of unknown type thrown in event handler.");
-               }
-
-               double et = Utility::GetTime();
-               double latency = st - wi.Timestamp;
-
-               {
-                       boost::mutex::scoped_lock lock(queue.Mutex);
-
-                       queue.WaitTime += latency;
-                       queue.ServiceTime += et - st;
-                       queue.TaskCount++;
-               }
-
-#ifdef I2_DEBUG
-#      ifdef RUSAGE_THREAD
-               (void) getrusage(RUSAGE_THREAD, &usage_end);
-
-               double duser = (usage_end.ru_utime.tv_sec - usage_start.ru_utime.tv_sec) +
-                       (usage_end.ru_utime.tv_usec - usage_start.ru_utime.tv_usec) / 1000000.0;
-
-               double dsys = (usage_end.ru_stime.tv_sec - usage_start.ru_stime.tv_sec) +
-                       (usage_end.ru_stime.tv_usec - usage_start.ru_stime.tv_usec) / 1000000.0;
-
-               double dwait = (et - st) - (duser + dsys);
-
-               int dminfaults = usage_end.ru_minflt - usage_start.ru_minflt;
-               int dmajfaults = usage_end.ru_majflt - usage_start.ru_majflt;
-
-               int dvctx = usage_end.ru_nvcsw - usage_start.ru_nvcsw;
-               int divctx = usage_end.ru_nivcsw - usage_start.ru_nivcsw;
-#      endif /* RUSAGE_THREAD */
-               if (et - st > 0.5) {
-                       Log(LogWarning, "ThreadPool")
-#      ifdef RUSAGE_THREAD
-                               << "Event call took user:" << duser << "s, system:" << dsys << "s, wait:" << dwait << "s, minor_faults:" << dminfaults << ", major_faults:" << dmajfaults << ", voluntary_csw:" << dvctx << ", involuntary_csw:" << divctx;
-#      else
-                               << "Event call took " << (et - st) << "s";
-#      endif /* RUSAGE_THREAD */
-               }
-#endif /* I2_DEBUG */
-       }
-
-       boost::mutex::scoped_lock lock(queue.Mutex);
-       UpdateUtilization(ThreadDead);
-       Zombie = false;
-}
-
-/**
- * Appends a work item to the work queue. Work items will be processed in FIFO order.
- *
- * @param callback The callback function for the work item.
- * @param policy The scheduling policy
- * @returns true if the item was queued, false otherwise.
- */
-bool ThreadPool::Post(const ThreadPool::WorkFunction& callback, SchedulerPolicy policy)
-{
-       WorkItem wi;
-       wi.Callback = callback;
-       wi.Timestamp = Utility::GetTime();
-
-       Queue& queue = m_Queues[Utility::Random() % (sizeof(m_Queues) / sizeof(m_Queues[0]))];
-
-       {
-               boost::mutex::scoped_lock lock(queue.Mutex);
-
-               if (queue.Stopped)
-                       return false;
-
-               if (policy == LowLatencyScheduler)
-                       queue.SpawnWorker(m_ThreadGroup);
-
-               queue.Items.emplace_back(std::move(wi));
-               queue.CV.notify_one();
-       }
-
-       return true;
-}
-
-void ThreadPool::ManagerThreadProc()
-{
-       std::ostringstream idbuf;
-       idbuf << "TP #" << m_ID << " Manager";
-       Utility::SetThreadName(idbuf.str());
-
-       double lastStats = 0;
-
-       for (;;) {
-               size_t total_pending = 0, total_alive = 0;
-               double total_avg_latency = 0;
-               double total_utilization = 0;
-
-               {
-                       boost::mutex::scoped_lock lock(m_MgmtMutex);
-
-                       if (!m_Stopped)
-                               m_MgmtCV.timed_wait(lock, boost::posix_time::milliseconds(500));
-
-                       if (m_Stopped)
-                               break;
-               }
-
-               for (auto& queue : m_Queues) {
-                       size_t pending, alive = 0;
-                       double avg_latency;
-                       double utilization = 0;
-
-                               boost::mutex::scoped_lock lock(queue.Mutex);
-
-                       for (auto& thread : queue.Threads)
-                               thread.UpdateUtilization();
-
-                       pending = queue.Items.size();
-
-                       for (auto& thread : queue.Threads) {
-                               if (thread.State != ThreadDead && !thread.Zombie) {
-                                       alive++;
-                                       utilization += thread.Utilization * 100;
-                               }
-                       }
-
-                       utilization /= alive;
-
-                       if (queue.TaskCount > 0)
-                               avg_latency = queue.WaitTime / (queue.TaskCount * 1.0);
-                       else
-                               avg_latency = 0;
-
-                       if (utilization < 60 || utilization > 80 || alive < 8) {
-                               double wthreads = std::ceil((utilization * alive) / 80.0);
-
-                               int tthreads = wthreads - alive;
-
-                               /* Make sure there is at least one thread per queue */
-                               if (alive + tthreads < 1)
-                                       tthreads = 1 - alive;
-
-                               /* Don't kill more than 2 threads at once. */
-                               if (tthreads < -2)
-                                       tthreads = -2;
-
-                               /* Spawn more workers if there are outstanding work items. */
-                               if (tthreads > 0 && pending > 0)
-                                       tthreads = 2;
-
-                               if (m_MaxThreads != UINT_MAX && (alive + tthreads) * (sizeof(m_Queues) / sizeof(m_Queues[0])) > m_MaxThreads)
-                                       tthreads = m_MaxThreads / (sizeof(m_Queues) / sizeof(m_Queues[0])) - alive;
-
-                               if (tthreads != 0) {
-                                       Log(LogNotice, "ThreadPool")
-                                               << "Thread pool; current: " << alive << "; adjustment: " << tthreads;
-                               }
-
-                               for (int i = 0; i < -tthreads; i++)
-                                       queue.KillWorker(m_ThreadGroup);
-
-                               for (int i = 0; i < tthreads; i++)
-                                       queue.SpawnWorker(m_ThreadGroup);
-                       }
-
-                       queue.WaitTime = 0;
-                       queue.ServiceTime = 0;
-                       queue.TaskCount = 0;
-
-                       total_pending += pending;
-                       total_alive += alive;
-                       total_avg_latency += avg_latency;
-                       total_utilization += utilization;
-               }
-
-               double now = Utility::GetTime();
-
-               if (lastStats < now - 15) {
-                       lastStats = now;
-
-                       Log(LogNotice, "ThreadPool")
-                               << "Pool #" << m_ID << ": Pending tasks: " << total_pending << "; Average latency: "
-                               << (long)(total_avg_latency * 1000 / (sizeof(m_Queues) / sizeof(m_Queues[0]))) << "ms"
-                               << "; Threads: " << total_alive
-                               << "; Pool utilization: " << (total_utilization / (sizeof(m_Queues) / sizeof(m_Queues[0]))) << "%";
-               }
-       }
-}
-
-/**
- * Note: Caller must hold m_Mutex
- */
-void ThreadPool::Queue::SpawnWorker(boost::thread_group& group)
-{
-       for (auto& thread : Threads) {
-               if (thread.State == ThreadDead) {
-                       Log(LogDebug, "ThreadPool", "Spawning worker thread.");
-
-                       thread = WorkerThread(ThreadIdle);
-                       thread.Thread = group.create_thread(std::bind(&ThreadPool::WorkerThread::ThreadProc, std::ref(thread), std::ref(*this)));
-
-                       break;
-               }
-       }
-}
-
-/**
- * Note: Caller must hold Mutex.
- */
-void ThreadPool::Queue::KillWorker(boost::thread_group& group)
-{
-       for (auto& thread : Threads) {
-               if (thread.State == ThreadIdle && !thread.Zombie) {
-                       Log(LogDebug, "ThreadPool", "Killing worker thread.");
-
-                       group.remove_thread(thread.Thread);
-                       thread.Thread->detach();
-                       delete thread.Thread;
-
-                       thread.Zombie = true;
-                       CV.notify_all();
-
-                       break;
-               }
-       }
-}
-
-/**
- * Note: Caller must hold queue Mutex.
- */
-void ThreadPool::WorkerThread::UpdateUtilization(ThreadState state)
+void ThreadPool::Stop()
 {
-       double utilization;
+       boost::unique_lock<decltype(m_Mutex)> lock (m_Mutex);
 
-       switch (State) {
-               case ThreadDead:
-                       return;
-               case ThreadIdle:
-                       utilization = 0;
-                       break;
-               case ThreadBusy:
-                       utilization = 1;
-                       break;
-               default:
-                       VERIFY(0);
+       if (m_Pool) {
+               m_Pool->join();
+               m_Pool = nullptr;
        }
-
-       double now = Utility::GetTime();
-       double time = now - LastUpdate;
-
-       const double avg_time = 5.0;
-
-       if (time > avg_time)
-               time = avg_time;
-
-       Utilization = (Utilization * (avg_time - time) + utilization * time) / avg_time;
-       LastUpdate = now;
-
-       if (state != ThreadUnspecified)
-               State = state;
 }
index 742e891c53527a56633d4fead0573b70ff8b9fd0..3a803f295f3f74e06fb8636a9ba5c9a188e3802a 100644 (file)
@@ -3,18 +3,21 @@
 #ifndef THREADPOOL_H
 #define THREADPOOL_H
 
-#include "base/i2-base.hpp"
-#include <boost/thread/thread.hpp>
-#include <boost/thread/mutex.hpp>
-#include <boost/thread/condition_variable.hpp>
-#include <deque>
+#include "base/exception.hpp"
+#include "base/logger.hpp"
+#include <cstddef>
+#include <exception>
+#include <functional>
+#include <memory>
 #include <thread>
+#include <boost/asio/post.hpp>
+#include <boost/asio/thread_pool.hpp>
+#include <boost/thread/locks.hpp>
+#include <boost/thread/shared_mutex.hpp>
 
 namespace icinga
 {
 
-#define QUEUECOUNT 4U
-
 enum SchedulerPolicy
 {
        DefaultScheduler,
@@ -31,83 +34,46 @@ class ThreadPool
 public:
        typedef std::function<void ()> WorkFunction;
 
-       ThreadPool(size_t max_threads = UINT_MAX);
+       ThreadPool(size_t threads = std::thread::hardware_concurrency() * 2u);
        ~ThreadPool();
 
        void Start();
        void Stop();
 
-       bool Post(const WorkFunction& callback, SchedulerPolicy policy = DefaultScheduler);
-
-private:
-       enum ThreadState
-       {
-               ThreadUnspecified,
-               ThreadDead,
-               ThreadIdle,
-               ThreadBusy
-       };
-
-       struct WorkItem
-       {
-               WorkFunction Callback;
-               double Timestamp;
-       };
-
-       struct Queue;
-
-       struct WorkerThread
-       {
-               ThreadState State{ThreadDead};
-               bool Zombie{false};
-               double Utilization{0};
-               double LastUpdate{0};
-               boost::thread *Thread{nullptr};
-
-               WorkerThread(ThreadState state = ThreadDead)
-                       : State(state)
-               { }
-
-               void UpdateUtilization(ThreadState state = ThreadUnspecified);
-
-               void ThreadProc(Queue& queue);
-       };
-
-       struct Queue
+       /**
+        * Appends a work item to the work queue. Work items will be processed in FIFO order.
+        *
+        * @param callback The callback function for the work item.
+        * @returns true if the item was queued, false otherwise.
+        */
+       template<class T>
+       bool Post(T callback, SchedulerPolicy)
        {
-               boost::mutex Mutex;
-               boost::condition_variable CV;
-               boost::condition_variable CVStarved;
-
-               std::deque<WorkItem> Items;
-
-               double WaitTime{0};
-               double ServiceTime{0};
-               int TaskCount{0};
-
-               bool Stopped{false};
-
-               WorkerThread Threads[16];
-
-               void SpawnWorker(boost::thread_group& group);
-               void KillWorker(boost::thread_group& group);
-       };
+               boost::shared_lock<decltype(m_Mutex)> lock (m_Mutex);
+
+               if (m_Pool) {
+                       boost::asio::post(*m_Pool, [callback]() {
+                               try {
+                                       callback();
+                               } catch (const std::exception& ex) {
+                                       Log(LogCritical, "ThreadPool")
+                                               << "Exception thrown in event handler:\n"
+                                               << DiagnosticInformation(ex);
+                               } catch (...) {
+                                       Log(LogCritical, "ThreadPool", "Exception of unknown type thrown in event handler.");
+                               }
+                       });
+
+                       return true;
+               } else {
+                       return false;
+               }
+       }
 
-       int m_ID;
-       static int m_NextID;
-
-       size_t m_MaxThreads;
-
-       boost::thread_group m_ThreadGroup;
-
-       std::thread m_MgmtThread;
-       boost::mutex m_MgmtMutex;
-       boost::condition_variable m_MgmtCV;
-       bool m_Stopped{true};
-
-       Queue m_Queues[QUEUECOUNT];
-
-       void ManagerThreadProc();
+private:
+       boost::shared_mutex m_Mutex;
+       std::unique_ptr<boost::asio::thread_pool> m_Pool;
+       size_t m_Threads;
 };
 
 }