]> granicus.if.org Git - icinga2/commitdiff
Implement WorkQueue metric stats and periodic logging 5280/head
authorMichael Friedrich <michael.friedrich@icinga.com>
Tue, 16 May 2017 10:54:37 +0000 (12:54 +0200)
committerMichael Friedrich <michael.friedrich@icinga.com>
Tue, 23 May 2017 14:00:21 +0000 (16:00 +0200)
refs #5133

lib/base/workqueue.cpp
lib/base/workqueue.hpp

index 04bf2f539e9160e166d4968e7d9ac00371b2fce1..f3d2a6c3256ed9fb1d4a2128c50176c40445ee41 100644 (file)
@@ -33,8 +33,11 @@ boost::thread_specific_ptr<WorkQueue *> l_ThreadWorkQueue;
 
 WorkQueue::WorkQueue(size_t maxItems, int threadCount)
        : m_ID(m_NextID++), m_ThreadCount(threadCount), m_Spawned(false), m_MaxItems(maxItems), m_Stopped(false),
-         m_Processing(0), m_NextTaskID(0)
+         m_Processing(0), m_NextTaskID(0), m_TaskStats(15 * 60), m_PendingTasks(0), m_PendingTasksTimestamp(0)
 {
+       /* Initialize logger. */
+       m_StatusTimerTimeout = Utility::GetTime();
+
        m_StatusTimer = new Timer();
        m_StatusTimer->SetInterval(10);
        m_StatusTimer->OnTimerExpired.connect(boost::bind(&WorkQueue::StatusTimerHandler, this));
@@ -192,14 +195,41 @@ void WorkQueue::StatusTimerHandler(void)
 {
        boost::mutex::scoped_lock lock(m_Mutex);
 
-       Log log(LogNotice, "WorkQueue");
+       ASSERT(!m_Name.IsEmpty());
+
+       int pending = m_Tasks.size();
 
-       log << "#" << m_ID;
+       double now = Utility::GetTime();
+       double gradient = (pending - m_PendingTasks) / (now - m_PendingTasksTimestamp);
+       double timeToZero = pending / gradient;
 
-       if (!m_Name.IsEmpty())
-               log << " (" << m_Name << ")";
+       String timeInfo;
+
+       if (pending > GetTaskCount(5)) {
+               timeInfo = " empty in ";
+               if (timeToZero < 0)
+                       timeInfo += "infinite time, your task handler isn't able to keep up";
+               else
+                       timeInfo += Utility::FormatDuration(timeToZero);
+       }
 
-       log << " tasks: " << m_Tasks.size();
+       m_PendingTasks = pending;
+       m_PendingTasksTimestamp = now;
+
+       /* Log if there are pending items, or 5 minute timeout is reached. */
+       if (pending > 0 || m_StatusTimerTimeout < now) {
+               Log(LogInformation, "WorkQueue")
+                   << "#" << m_ID << " (" << m_Name << ") "
+                   << "items: " << pending << ", "
+                   << "rate: " << std::setw(2) << GetTaskCount(60) / 60.0 << "/s "
+                   << "(" << GetTaskCount(60) << "/min " << GetTaskCount(60 * 5) << "/5min " << GetTaskCount(60 * 15) << "/15min);"
+                   << timeInfo;
+       }
+
+       /* Reschedule next log entry in 5 minutes. */
+       if (m_StatusTimerTimeout < now) {
+               m_StatusTimerTimeout = now + 60 * 5;
+       }
 }
 
 void WorkQueue::WorkerThreadProc(void)
@@ -247,6 +277,8 @@ void WorkQueue::WorkerThreadProc(void)
                   _before_ we re-acquire the mutex */
                task = Task();
 
+               IncreaseTaskCount();
+
                lock.lock();
 
                m_Processing--;
@@ -256,3 +288,16 @@ void WorkQueue::WorkerThreadProc(void)
        }
 }
 
+void WorkQueue::IncreaseTaskCount(void)
+{
+       double now = Utility::GetTime();
+
+       boost::mutex::scoped_lock lock(m_StatsMutex);
+       m_TaskStats.InsertValue(now, 1);
+}
+
+int WorkQueue::GetTaskCount(RingBuffer::SizeType span) const
+{
+       boost::mutex::scoped_lock lock(m_StatsMutex);
+       return m_TaskStats.GetValues(span);
+}
index 5fdb62579431d71566325bb75aa22344e7c5c83e..d9ec53bddf483f60d303021093c21ad17ec39c0f 100644 (file)
@@ -22,6 +22,7 @@
 
 #include "base/i2-base.hpp"
 #include "base/timer.hpp"
+#include "base/ringbuffer.hpp"
 #include <boost/function.hpp>
 #include <boost/thread/thread.hpp>
 #include <boost/thread/mutex.hpp>
@@ -93,6 +94,7 @@ public:
        bool IsWorkerThread(void) const;
 
        size_t GetLength(void) const;
+       int GetTaskCount(RingBuffer::SizeType span) const;
 
        void SetExceptionCallback(const ExceptionCallback& callback);
 
@@ -100,6 +102,9 @@ public:
        std::vector<boost::exception_ptr> GetExceptions(void) const;
        void ReportExceptions(const String& facility) const;
 
+protected:
+       void IncreaseTaskCount(void);
+
 private:
        int m_ID;
        String m_Name;
@@ -120,6 +125,12 @@ private:
        ExceptionCallback m_ExceptionCallback;
        std::vector<boost::exception_ptr> m_Exceptions;
        Timer::Ptr m_StatusTimer;
+       double m_StatusTimerTimeout;
+
+       mutable boost::mutex m_StatsMutex;
+       RingBuffer m_TaskStats;
+       int m_PendingTasks;
+       double m_PendingTasksTimestamp;
 
        void WorkerThreadProc(void);
        void StatusTimerHandler(void);