WorkQueue::WorkQueue(size_t maxItems, int threadCount)
: m_ID(m_NextID++), m_ThreadCount(threadCount), m_Spawned(false), m_MaxItems(maxItems), m_Stopped(false),
- m_Processing(0), m_NextTaskID(0)
+ m_Processing(0), m_NextTaskID(0), m_TaskStats(15 * 60), m_PendingTasks(0), m_PendingTasksTimestamp(0)
{
+ /* Initialize logger. */
+ m_StatusTimerTimeout = Utility::GetTime();
+
m_StatusTimer = new Timer();
m_StatusTimer->SetInterval(10);
m_StatusTimer->OnTimerExpired.connect(boost::bind(&WorkQueue::StatusTimerHandler, this));
{
boost::mutex::scoped_lock lock(m_Mutex);
- Log log(LogNotice, "WorkQueue");
+ ASSERT(!m_Name.IsEmpty());
+
+ int pending = m_Tasks.size();
- log << "#" << m_ID;
+ double now = Utility::GetTime();
+ double gradient = (pending - m_PendingTasks) / (now - m_PendingTasksTimestamp);
+ double timeToZero = pending / gradient;
- if (!m_Name.IsEmpty())
- log << " (" << m_Name << ")";
+ String timeInfo;
+
+ if (pending > GetTaskCount(5)) {
+ timeInfo = " empty in ";
+ if (timeToZero < 0)
+ timeInfo += "infinite time, your task handler isn't able to keep up";
+ else
+ timeInfo += Utility::FormatDuration(timeToZero);
+ }
- log << " tasks: " << m_Tasks.size();
+ m_PendingTasks = pending;
+ m_PendingTasksTimestamp = now;
+
+ /* Log if there are pending items, or 5 minute timeout is reached. */
+ if (pending > 0 || m_StatusTimerTimeout < now) {
+ Log(LogInformation, "WorkQueue")
+ << "#" << m_ID << " (" << m_Name << ") "
+ << "items: " << pending << ", "
+ << "rate: " << std::setw(2) << GetTaskCount(60) / 60.0 << "/s "
+ << "(" << GetTaskCount(60) << "/min " << GetTaskCount(60 * 5) << "/5min " << GetTaskCount(60 * 15) << "/15min);"
+ << timeInfo;
+ }
+
+ /* Reschedule next log entry in 5 minutes. */
+ if (m_StatusTimerTimeout < now) {
+ m_StatusTimerTimeout = now + 60 * 5;
+ }
}
void WorkQueue::WorkerThreadProc(void)
_before_ we re-acquire the mutex */
task = Task();
+ IncreaseTaskCount();
+
lock.lock();
m_Processing--;
}
}
+void WorkQueue::IncreaseTaskCount(void)
+{
+ double now = Utility::GetTime();
+
+ boost::mutex::scoped_lock lock(m_StatsMutex);
+ m_TaskStats.InsertValue(now, 1);
+}
+
+int WorkQueue::GetTaskCount(RingBuffer::SizeType span) const
+{
+ boost::mutex::scoped_lock lock(m_StatsMutex);
+ return m_TaskStats.GetValues(span);
+}
#include "base/i2-base.hpp"
#include "base/timer.hpp"
+#include "base/ringbuffer.hpp"
#include <boost/function.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
bool IsWorkerThread(void) const;
size_t GetLength(void) const;
+ int GetTaskCount(RingBuffer::SizeType span) const;
void SetExceptionCallback(const ExceptionCallback& callback);
std::vector<boost::exception_ptr> GetExceptions(void) const;
void ReportExceptions(const String& facility) const;
+protected:
+ void IncreaseTaskCount(void);
+
private:
int m_ID;
String m_Name;
ExceptionCallback m_ExceptionCallback;
std::vector<boost::exception_ptr> m_Exceptions;
Timer::Ptr m_StatusTimer;
+ double m_StatusTimerTimeout;
+
+ mutable boost::mutex m_StatsMutex;
+ RingBuffer m_TaskStats;
+ int m_PendingTasks;
+ double m_PendingTasksTimestamp;
void WorkerThreadProc(void);
void StatusTimerHandler(void);