From e739dfd88f3e02f22ba8012bb717a316173114ba Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Mon, 25 Mar 2013 16:12:25 +0100 Subject: [PATCH] EventQueue: Further changes to how we determine the optimal number of threads. --- lib/base/eventqueue.cpp | 106 ++++++++++++++++++++++--------- lib/base/eventqueue.h | 8 ++- lib/python/pythoninterpreter.cpp | 4 +- 3 files changed, 83 insertions(+), 35 deletions(-) diff --git a/lib/base/eventqueue.cpp b/lib/base/eventqueue.cpp index 00ae9b80c..ddd607e12 100644 --- a/lib/base/eventqueue.cpp +++ b/lib/base/eventqueue.cpp @@ -22,6 +22,7 @@ #include "base/convert.h" #include "base/utility.h" #include +#include #include #include #include @@ -29,7 +30,7 @@ using namespace icinga; EventQueue::EventQueue(void) - : m_Stopped(false), m_ThreadDeaths(0), m_Latency(0), m_LatencyCount(0) + : m_Stopped(false), m_ThreadDeaths(0), m_WaitTime(0), m_ServiceTime(0), m_TaskCount(0) { for (int i = 0; i < sizeof(m_ThreadStates) / sizeof(m_ThreadStates[0]); i++) m_ThreadStates[i] = ThreadDead; @@ -76,6 +77,9 @@ void EventQueue::QueueThreadProc(int tid) for (;;) { EventQueueWorkItem event; + double ws = Utility::GetTime(); + double st; + { boost::mutex::scoped_lock lock(m_Mutex); @@ -96,19 +100,11 @@ void EventQueue::QueueThreadProc(int tid) m_Events.pop_front(); m_ThreadStates[tid] = ThreadBusy; - - double latency = Utility::GetTime() - event.Timestamp; - - m_Latency += latency; - m_LatencyCount++; - - if (latency > m_MaxLatency) - m_MaxLatency = latency; + st = Utility::GetTime(); + UpdateThreadUtilization(tid, st - ws, 0); } #ifdef _DEBUG - double st = Utility::GetTime(); - # ifdef RUSAGE_THREAD struct rusage usage_start, usage_end; @@ -128,8 +124,23 @@ void EventQueue::QueueThreadProc(int tid) Log(LogCritical, "base", "Exception of unknown type thrown in event handler."); } -#ifdef _DEBUG double et = Utility::GetTime(); + double latency = st - event.Timestamp; + + { + boost::mutex::scoped_lock lock(m_Mutex); + + m_WaitTime += latency; + m_ServiceTime += et - st; + m_TaskCount++; + + if (latency > m_MaxLatency) + m_MaxLatency = latency; + + UpdateThreadUtilization(tid, et - st, 1); + } + +#ifdef _DEBUG # ifdef RUSAGE_THREAD (void) getrusage(RUSAGE_THREAD, &usage_end); @@ -190,7 +201,7 @@ void EventQueue::ManagerThreadProc(void) double now = Utility::GetTime(); - int pending, alive, busy; + int pending, alive; double avg_latency, max_latency; { @@ -198,39 +209,58 @@ void EventQueue::ManagerThreadProc(void) pending = m_Events.size(); alive = 0; - busy = 0; + + double util = 0; + int hg = 0; for (int i = 0; i < sizeof(m_ThreadStates) / sizeof(m_ThreadStates[0]); i++) { - if (m_ThreadStates[i] != ThreadDead) + if (m_ThreadStates[i] != ThreadDead) { alive++; - - if (m_ThreadStates[i] == ThreadBusy) - busy++; + util += m_ThreadUtilization[i] * 100; + std::cout << (int)(m_ThreadUtilization[i] * 100) << "\t"; + hg++; + if (hg % 25 == 0) + std::cout << std::endl; + } } - if (m_LatencyCount > 0) - avg_latency = m_Latency / (m_LatencyCount * 1.0); + util /= alive; + + std::cout << std::endl; + + if (m_TaskCount > 0) + avg_latency = m_WaitTime / (m_TaskCount * 1.0); else avg_latency = 0; - m_Latency = 0; - m_LatencyCount = 0; + std::cout << "Wait time: " << m_WaitTime << "; Service time: " << m_ServiceTime << "; tasks: " << m_TaskCount << std::endl; + std::cout << "Thread util: " << util << std::endl; - max_latency = m_MaxLatency; - m_MaxLatency = 0; + if (util < 60 || util > 80) { + int tthreads = ceil((util * alive) / 80.0) - alive; - if (max_latency > 0.1) { - /* Spawn a few additional workers. */ - for (int i = 0; i < 8; i++) + if (alive + tthreads < 2) + tthreads = 2 - alive; + + std::cout << "Target threads: " << tthreads << "; Alive: " << alive << std::endl; + + for (int i = 0; i < -tthreads; i++) + KillWorker(); + + for (int i = 0; i < tthreads; i++) SpawnWorker(); - } else if (alive > busy + 2) { - KillWorker(); } + + m_WaitTime = 0; + m_ServiceTime = 0; + m_TaskCount = 0; + + max_latency = m_MaxLatency; + m_MaxLatency = 0; } std::ostringstream msgbuf; - msgbuf << "Pending tasks: " << pending << "; Busy threads: " << busy - << "; Idle threads: " << alive - busy << "; Average latency: " << (long)(avg_latency * 1000) << "ms" + msgbuf << "Pending tasks: " << pending << "; Average latency: " << (long)(avg_latency * 1000) << "ms" << "; Max latency: " << (long)(max_latency * 1000) << "ms"; Log(LogInformation, "base", msgbuf.str()); } @@ -246,6 +276,7 @@ void EventQueue::SpawnWorker(void) Log(LogDebug, "debug", "Spawning worker thread."); m_ThreadStates[i] = ThreadIdle; + m_ThreadUtilization[i] = 0; boost::thread worker(boost::bind(&EventQueue::QueueThreadProc, this, i)); worker.detach(); @@ -263,3 +294,16 @@ void EventQueue::KillWorker(void) m_ThreadDeaths++; } + +/** + * Note: Caller must hold m_Mutex. + */ +void EventQueue::UpdateThreadUtilization(int tid, double time, double utilization) +{ + const double avg_time = 5.0; + + if (time > avg_time) + time = avg_time; + + m_ThreadUtilization[tid] = (m_ThreadUtilization[tid] * (avg_time - time) + utilization * time) / avg_time; +} diff --git a/lib/base/eventqueue.h b/lib/base/eventqueue.h index 4ab7661c3..d3f3035dd 100644 --- a/lib/base/eventqueue.h +++ b/lib/base/eventqueue.h @@ -63,10 +63,12 @@ public: private: ThreadState m_ThreadStates[512]; + double m_ThreadUtilization[512]; int m_ThreadDeaths; - double m_Latency; - int m_LatencyCount; + double m_WaitTime; + double m_ServiceTime; + int m_TaskCount; double m_MaxLatency; @@ -81,6 +83,8 @@ private: void SpawnWorker(void); void KillWorker(void); + + void UpdateThreadUtilization(int tid, double time, double utilization); }; } diff --git a/lib/python/pythoninterpreter.cpp b/lib/python/pythoninterpreter.cpp index eae2abf1b..95b9b7457 100644 --- a/lib/python/pythoninterpreter.cpp +++ b/lib/python/pythoninterpreter.cpp @@ -120,9 +120,9 @@ void PythonInterpreter::ProcessCall(const ScriptTask::Ptr& task, const String& f Value vresult = PythonLanguage::MarshalFromPython(result); Py_DECREF(result); - Utility::QueueAsyncCallback(boost::bind(&ScriptTask::FinishResult, task, vresult)); + task->FinishResult(vresult); } catch (...) { - Utility::QueueAsyncCallback(boost::bind(&ScriptTask::FinishException, task, boost::current_exception())); + task->FinishException(boost::current_exception()); } m_Language->SetCurrentInterpreter(interp); -- 2.40.0