From 375c5a53c6bf370b72cc47713c71ec5dc5252f22 Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Sun, 3 Nov 2013 10:03:11 +0100 Subject: [PATCH] Don't join threads that are already dead. Refs #4990 --- lib/base/threadpool.cpp | 67 ++++++++++++++++++----------------------- lib/base/threadpool.h | 11 +++---- 2 files changed, 33 insertions(+), 45 deletions(-) diff --git a/lib/base/threadpool.cpp b/lib/base/threadpool.cpp index cfefdee14..2ffcb7dff 100644 --- a/lib/base/threadpool.cpp +++ b/lib/base/threadpool.cpp @@ -40,8 +40,8 @@ ThreadPool::ThreadPool(void) for (int i = 0; i < 2; i++) SpawnWorker(); - m_ManagerThread = boost::thread(boost::bind(&ThreadPool::ManagerThreadProc, this)); - m_StatsThread = boost::thread(boost::bind(&ThreadPool::StatsThreadProc, this)); + m_Threads.create_thread(boost::bind(&ThreadPool::ManagerThreadProc, this)); + m_Threads.create_thread(boost::bind(&ThreadPool::StatsThreadProc, this)); } ThreadPool::~ThreadPool(void) @@ -63,24 +63,17 @@ void ThreadPool::Stop(void) */ void ThreadPool::Join(void) { - boost::mutex::scoped_lock lock(m_Mutex); - - while (!m_Stopped || !m_WorkItems.empty()) { - lock.unlock(); - Utility::Sleep(0.5); - lock.lock(); - } - - for (size_t i = 0; i < sizeof(m_Threads) / sizeof(m_Threads[0]); i++) { - lock.unlock(); - m_Threads[i].Thread.join(); - lock.lock(); + { + boost::mutex::scoped_lock lock(m_Mutex); - m_Threads[i].State = ThreadDead; + while (!m_Stopped || !m_WorkItems.empty()) { + lock.unlock(); + Utility::Sleep(0.5); + lock.lock(); + } } - m_ManagerThread.join(); - m_StatsThread.join(); + m_Threads.join_all(); } /** @@ -100,10 +93,10 @@ void ThreadPool::QueueThreadProc(int tid) UpdateThreadUtilization(tid, ThreadIdle); - while (m_WorkItems.empty() && !m_Stopped && !m_Threads[tid].Zombie) + while (m_WorkItems.empty() && !m_Stopped && !m_ThreadStats[tid].Zombie) m_WorkCV.wait(lock); - if (m_Threads[tid].Zombie) + if (m_ThreadStats[tid].Zombie) break; if (m_WorkItems.empty() && m_Stopped) @@ -184,7 +177,7 @@ void ThreadPool::QueueThreadProc(int tid) boost::mutex::scoped_lock lock(m_Mutex); UpdateThreadUtilization(tid, ThreadDead); - m_Threads[tid].Zombie = false; + m_ThreadStats[tid].Zombie = false; } /** @@ -236,10 +229,10 @@ void ThreadPool::ManagerThreadProc(void) alive = 0; - for (size_t i = 0; i < sizeof(m_Threads) / sizeof(m_Threads[0]); i++) { - if (m_Threads[i].State != ThreadDead && !m_Threads[i].Zombie) { + for (size_t i = 0; i < sizeof(m_ThreadStats) / sizeof(m_ThreadStats[0]); i++) { + if (m_ThreadStats[i].State != ThreadDead && !m_ThreadStats[i].Zombie) { alive++; - utilization += m_Threads[i].Utilization * 100; + utilization += m_ThreadStats[i].Utilization * 100; } } @@ -297,14 +290,12 @@ void ThreadPool::ManagerThreadProc(void) */ void ThreadPool::SpawnWorker(void) { - for (size_t i = 0; i < sizeof(m_Threads) / sizeof(m_Threads[0]); i++) { - if (m_Threads[i].State == ThreadDead) { + for (size_t i = 0; i < sizeof(m_ThreadStats) / sizeof(m_ThreadStats[0]); i++) { + if (m_ThreadStats[i].State == ThreadDead) { Log(LogDebug, "debug", "Spawning worker thread."); - m_Threads[i].State = ThreadIdle; - - boost::thread thread(boost::bind(&ThreadPool::QueueThreadProc, this, i)); - m_Threads[i].Thread = boost::move(thread); + m_ThreadStats[i] = ThreadStats(ThreadIdle); + m_Threads.create_thread(boost::bind(&ThreadPool::QueueThreadProc, this, i)); break; } @@ -316,11 +307,11 @@ void ThreadPool::SpawnWorker(void) */ void ThreadPool::KillWorker(void) { - for (size_t i = 0; i < sizeof(m_Threads) / sizeof(m_Threads[0]); i++) { - if (m_Threads[i].State == ThreadIdle && !m_Threads[i].Zombie) { + for (size_t i = 0; i < sizeof(m_ThreadStats) / sizeof(m_ThreadStats[0]); i++) { + if (m_ThreadStats[i].State == ThreadIdle && !m_ThreadStats[i].Zombie) { Log(LogDebug, "base", "Killing worker thread."); - m_Threads[i].Zombie = true; + m_ThreadStats[i].Zombie = true; m_WorkCV.notify_all(); break; @@ -343,7 +334,7 @@ void ThreadPool::StatsThreadProc(void) if (m_Stopped) break; - for (size_t i = 0; i < sizeof(m_Threads) / sizeof(m_Threads[0]); i++) + for (size_t i = 0; i < sizeof(m_ThreadStats) / sizeof(m_ThreadStats[0]); i++) UpdateThreadUtilization(i); } } @@ -355,7 +346,7 @@ void ThreadPool::UpdateThreadUtilization(int tid, ThreadState state) { double utilization; - switch (m_Threads[tid].State) { + switch (m_ThreadStats[tid].State) { case ThreadDead: return; case ThreadIdle: @@ -369,16 +360,16 @@ void ThreadPool::UpdateThreadUtilization(int tid, ThreadState state) } double now = Utility::GetTime(); - double time = now - m_Threads[tid].LastUpdate; + double time = now - m_ThreadStats[tid].LastUpdate; const double avg_time = 5.0; if (time > avg_time) time = avg_time; - m_Threads[tid].Utilization = (m_Threads[tid].Utilization * (avg_time - time) + utilization * time) / avg_time; - m_Threads[tid].LastUpdate = now; + m_ThreadStats[tid].Utilization = (m_ThreadStats[tid].Utilization * (avg_time - time) + utilization * time) / avg_time; + m_ThreadStats[tid].LastUpdate = now; if (state != ThreadUnspecified) - m_Threads[tid].State = state; + m_ThreadStats[tid].State = state; } diff --git a/lib/base/threadpool.h b/lib/base/threadpool.h index c59960e42..e1abdfb0d 100644 --- a/lib/base/threadpool.h +++ b/lib/base/threadpool.h @@ -57,15 +57,14 @@ private: ThreadBusy }; - struct WorkerThread + struct ThreadStats { - boost::thread Thread; ThreadState State; bool Zombie; double Utilization; double LastUpdate; - WorkerThread(ThreadState state = ThreadDead) + ThreadStats(ThreadState state = ThreadDead) : State(state), Zombie(false), Utilization(0), LastUpdate(0) { } }; @@ -73,10 +72,8 @@ private: int m_ID; static int m_NextID; - WorkerThread m_Threads[512]; - - boost::thread m_ManagerThread; - boost::thread m_StatsThread; + boost::thread_group m_Threads; + ThreadStats m_ThreadStats[512]; double m_WaitTime; double m_ServiceTime; -- 2.40.0