]> granicus.if.org Git - icinga2/commitdiff
EventQueue: Further changes to how we determine the optimal number of threads.
authorGunnar Beutner <gunnar.beutner@netways.de>
Mon, 25 Mar 2013 15:12:25 +0000 (16:12 +0100)
committerGunnar Beutner <gunnar.beutner@netways.de>
Mon, 25 Mar 2013 15:12:25 +0000 (16:12 +0100)
lib/base/eventqueue.cpp
lib/base/eventqueue.h
lib/python/pythoninterpreter.cpp

index 00ae9b80c8611df93107dc635be4d808ee7eb827..ddd607e121eb8d812a55ddcff00740fae1fb2668 100644 (file)
@@ -22,6 +22,7 @@
 #include "base/convert.h"
 #include "base/utility.h"
 #include <sstream>
+#include <iostream>
 #include <boost/bind.hpp>
 #include <boost/exception/diagnostic_information.hpp>
 #include <boost/foreach.hpp>
@@ -29,7 +30,7 @@
 using namespace icinga;
 
 EventQueue::EventQueue(void)
-       : m_Stopped(false), m_ThreadDeaths(0), m_Latency(0), m_LatencyCount(0)
+       : m_Stopped(false), m_ThreadDeaths(0), m_WaitTime(0), m_ServiceTime(0), m_TaskCount(0)
 {
        for (int i = 0; i < sizeof(m_ThreadStates) / sizeof(m_ThreadStates[0]); i++)
                m_ThreadStates[i] = ThreadDead;
@@ -76,6 +77,9 @@ void EventQueue::QueueThreadProc(int tid)
        for (;;) {
                EventQueueWorkItem event;
 
+               double ws = Utility::GetTime();
+               double st;
+
                {
                        boost::mutex::scoped_lock lock(m_Mutex);
 
@@ -96,19 +100,11 @@ void EventQueue::QueueThreadProc(int tid)
                        m_Events.pop_front();
 
                        m_ThreadStates[tid] = ThreadBusy;
-
-                       double latency = Utility::GetTime() - event.Timestamp;
-
-                       m_Latency += latency;
-                       m_LatencyCount++;
-
-                       if (latency > m_MaxLatency)
-                               m_MaxLatency = latency;
+                       st = Utility::GetTime();
+                       UpdateThreadUtilization(tid, st - ws, 0);
                }
 
 #ifdef _DEBUG
-               double st = Utility::GetTime();
-
 #      ifdef RUSAGE_THREAD
                struct rusage usage_start, usage_end;
 
@@ -128,8 +124,23 @@ void EventQueue::QueueThreadProc(int tid)
                        Log(LogCritical, "base", "Exception of unknown type thrown in event handler.");
                }
 
-#ifdef _DEBUG
                double et = Utility::GetTime();
+               double latency = st - event.Timestamp;
+
+               {
+                       boost::mutex::scoped_lock lock(m_Mutex);
+
+                       m_WaitTime += latency;
+                       m_ServiceTime += et - st;
+                       m_TaskCount++;
+
+                       if (latency > m_MaxLatency)
+                               m_MaxLatency = latency;
+
+                       UpdateThreadUtilization(tid, et - st, 1);
+               }
+
+#ifdef _DEBUG
 #      ifdef RUSAGE_THREAD
                (void) getrusage(RUSAGE_THREAD, &usage_end);
 
@@ -190,7 +201,7 @@ void EventQueue::ManagerThreadProc(void)
 
                double now = Utility::GetTime();
 
-               int pending, alive, busy;
+               int pending, alive;
                double avg_latency, max_latency;
 
                {
@@ -198,39 +209,58 @@ void EventQueue::ManagerThreadProc(void)
                        pending = m_Events.size();
 
                        alive = 0;
-                       busy = 0;
+
+                       double util = 0;
+                       int hg = 0;
 
                        for (int i = 0; i < sizeof(m_ThreadStates) / sizeof(m_ThreadStates[0]); i++) {
-                               if (m_ThreadStates[i] != ThreadDead)
+                               if (m_ThreadStates[i] != ThreadDead) {
                                        alive++;
-
-                               if (m_ThreadStates[i] == ThreadBusy)
-                                       busy++;
+                                       util += m_ThreadUtilization[i] * 100;
+                                       std::cout << (int)(m_ThreadUtilization[i] * 100) << "\t";
+                                       hg++;
+                                       if (hg % 25 == 0)
+                                               std::cout << std::endl;
+                               }
                        }
 
-                       if (m_LatencyCount > 0)
-                               avg_latency = m_Latency / (m_LatencyCount * 1.0);
+                       util /= alive;
+
+                       std::cout << std::endl;
+
+                       if (m_TaskCount > 0)
+                               avg_latency = m_WaitTime / (m_TaskCount * 1.0);
                        else
                                avg_latency = 0;
 
-                       m_Latency = 0;
-                       m_LatencyCount = 0;
+                       std::cout << "Wait time: " << m_WaitTime << "; Service time: " << m_ServiceTime << "; tasks: " << m_TaskCount << std::endl;
+                       std::cout << "Thread util: " << util << std::endl;
 
-                       max_latency = m_MaxLatency;
-                       m_MaxLatency = 0;
+                       if (util < 60 || util > 80) {
+                               int tthreads = ceil((util * alive) / 80.0) - alive;
 
-                       if (max_latency > 0.1) {
-                               /* Spawn a few additional workers. */
-                               for (int i = 0; i < 8; i++)
+                               if (alive + tthreads < 2)
+                                       tthreads = 2 - alive;
+
+                               std::cout << "Target threads: " << tthreads << "; Alive: " << alive << std::endl;
+
+                               for (int i = 0; i < -tthreads; i++)
+                                       KillWorker();
+
+                               for (int i = 0; i < tthreads; i++)
                                        SpawnWorker();
-                       } else if (alive > busy + 2) {
-                               KillWorker();
                        }
+
+                       m_WaitTime = 0;
+                       m_ServiceTime = 0;
+                       m_TaskCount = 0;
+
+                       max_latency = m_MaxLatency;
+                       m_MaxLatency = 0;
                }
 
                std::ostringstream msgbuf;
-               msgbuf << "Pending tasks: " << pending << "; Busy threads: " << busy
-                   << "; Idle threads: " << alive - busy << "; Average latency: " << (long)(avg_latency * 1000) << "ms"
+               msgbuf << "Pending tasks: " << pending << "; Average latency: " << (long)(avg_latency * 1000) << "ms"
                    << "; Max latency: " << (long)(max_latency * 1000) << "ms";
                Log(LogInformation, "base", msgbuf.str());
        }
@@ -246,6 +276,7 @@ void EventQueue::SpawnWorker(void)
                        Log(LogDebug, "debug", "Spawning worker thread.");
 
                        m_ThreadStates[i] = ThreadIdle;
+                       m_ThreadUtilization[i] = 0;
                        boost::thread worker(boost::bind(&EventQueue::QueueThreadProc, this, i));
                        worker.detach();
 
@@ -263,3 +294,16 @@ void EventQueue::KillWorker(void)
 
        m_ThreadDeaths++;
 }
+
+/**
+ * Note: Caller must hold m_Mutex.
+ */
+void EventQueue::UpdateThreadUtilization(int tid, double time, double utilization)
+{
+       const double avg_time = 5.0;
+
+       if (time > avg_time)
+               time = avg_time;
+
+       m_ThreadUtilization[tid] = (m_ThreadUtilization[tid] * (avg_time - time) + utilization * time) / avg_time;
+}
index 4ab7661c3cfd79368588daf76421c1538bc5384b..d3f3035ddc789b9d3af8f5392e8fe6e8bbe6099a 100644 (file)
@@ -63,10 +63,12 @@ public:
 
 private:
        ThreadState m_ThreadStates[512];
+       double m_ThreadUtilization[512];
        int m_ThreadDeaths;
 
-       double m_Latency;
-       int m_LatencyCount;
+       double m_WaitTime;
+       double m_ServiceTime;
+       int m_TaskCount;
 
        double m_MaxLatency;
 
@@ -81,6 +83,8 @@ private:
 
        void SpawnWorker(void);
        void KillWorker(void);
+
+       void UpdateThreadUtilization(int tid, double time, double utilization);
 };
 
 }
index eae2abf1b497857b8394d6b708b043fe662ea624..95b9b7457387bba63d5f6c3dfcd5903b3ea6d6a6 100644 (file)
@@ -120,9 +120,9 @@ void PythonInterpreter::ProcessCall(const ScriptTask::Ptr& task, const String& f
                Value vresult = PythonLanguage::MarshalFromPython(result);
                Py_DECREF(result);
 
-               Utility::QueueAsyncCallback(boost::bind(&ScriptTask::FinishResult, task, vresult));
+               task->FinishResult(vresult);
        } catch (...) {
-               Utility::QueueAsyncCallback(boost::bind(&ScriptTask::FinishException, task, boost::current_exception()));
+               task->FinishException(boost::current_exception());
        }
 
        m_Language->SetCurrentInterpreter(interp);