WorkQueue::WorkQueue(size_t maxItems)
: m_ID(m_NextID++), m_MaxItems(maxItems), m_Joined(false),
- m_Stopped(false), m_ExceptionCallback(WorkQueue::DefaultExceptionCallback),
- m_LastStatus(0)
+ m_Stopped(false), m_ExceptionCallback(WorkQueue::DefaultExceptionCallback)
{
m_Thread = boost::thread(boost::bind(&WorkQueue::WorkerThreadProc, this));
+
+ m_StatusTimer = make_shared<Timer>();
+ m_StatusTimer->SetInterval(10);
+ m_StatusTimer->OnTimerExpired.connect(boost::bind(&WorkQueue::StatusTimerHandler, this));
+ m_StatusTimer->Start();
}
WorkQueue::~WorkQueue(void)
*/
void WorkQueue::Enqueue(const WorkCallback& callback, bool allowInterleaved)
{
+ WorkItem item;
+ item.Callback = callback;
+ item.AllowInterleaved = allowInterleaved;
+
+ bool wq_thread = (boost::this_thread::get_id() == GetThreadId());
+
boost::mutex::scoped_lock lock(m_Mutex);
ASSERT(!m_Stopped);
- bool wq_thread = (boost::this_thread::get_id() == GetThreadId());
-
if (!wq_thread) {
while (m_Items.size() >= m_MaxItems)
- m_CV.wait(lock);
+ m_CVFull.wait(lock);
}
- WorkItem item;
- item.Callback = callback;
- item.AllowInterleaved = allowInterleaved;
-
m_Items.push_back(item);
if (wq_thread)
ProcessItems(lock, true);
else
- m_CV.notify_all();
+ m_CVEmpty.notify_one();
}
void WorkQueue::Join(void)
{
boost::mutex::scoped_lock lock(m_Mutex);
m_Joined = true;
- m_CV.notify_all();
+ m_CVEmpty.notify_all();
while (!m_Stopped)
- m_CV.wait(lock);
+ m_CVFull.wait(lock);
}
boost::thread::id WorkQueue::GetThreadId(void) const
throw;
}
+void WorkQueue::StatusTimerHandler(void)
+{
+ boost::mutex::scoped_lock lock(m_Mutex);
+
+ Log(LogInformation, "base", "WQ #" + Convert::ToString(m_ID) + " items: " + Convert::ToString(m_Items.size()));
+}
+
void WorkQueue::ProcessItems(boost::mutex::scoped_lock& lock, bool interleaved)
{
while (!m_Items.empty()) {
- try {
- WorkItem wi = m_Items.front();
-
- if (interleaved && !wi.AllowInterleaved)
- return;
+ WorkItem wi = m_Items.front();
- m_Items.pop_front();
- m_CV.notify_all();
+ if (interleaved && !wi.AllowInterleaved)
+ return;
- double now = Utility::GetTime();
+ m_Items.pop_front();
+ m_CVFull.notify_all();
- if (m_LastStatus + 10 < now) {
- Log(LogInformation, "base", "WQ items: " + Convert::ToString(m_Items.size()));
- m_LastStatus = now;
- }
+ lock.unlock();
- lock.unlock();
+ try {
wi.Callback();
} catch (const std::exception& ex) {
lock.lock();
for (;;) {
while (m_Items.empty() && !m_Joined)
- m_CV.wait(lock);
+ m_CVEmpty.wait(lock);
if (m_Joined)
break;
}
m_Stopped = true;
- m_CV.notify_all();
+ m_CVFull.notify_all();
}
#define WORKQUEUE_H
#include "base/i2-base.h"
+#include "base/timer.h"
#include <deque>
#include <boost/function.hpp>
#include <boost/thread/thread.hpp>
public:
typedef boost::function<void (boost::exception_ptr)> ExceptionCallback;
- WorkQueue(size_t maxItems = 2500000);
+ WorkQueue(size_t maxItems = 25000);
~WorkQueue(void);
void Enqueue(const WorkCallback& callback, bool allowInterleaved = false);
static int m_NextID;
boost::mutex m_Mutex;
- boost::condition_variable m_CV;
+ boost::condition_variable m_CVEmpty;
+ boost::condition_variable m_CVFull;
boost::thread m_Thread;
size_t m_MaxItems;
bool m_Joined;
bool m_Stopped;
std::deque<WorkItem> m_Items;
ExceptionCallback m_ExceptionCallback;
- double m_LastStatus;
+ Timer::Ptr m_StatusTimer;
void ProcessItems(boost::mutex::scoped_lock& lock, bool interleaved);
void WorkerThreadProc(void);
+ void StatusTimerHandler(void);
static void DefaultExceptionCallback(boost::exception_ptr exp);
};