From b4ab6c8253a8871c3f3fe314fdc5e491dabc473a Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Thu, 17 Oct 2013 10:19:17 +0200 Subject: [PATCH] Re-implement WorkQueue item limit. --- lib/base/workqueue.cpp | 62 +++++++++++++++++++++++++++--------------- lib/base/workqueue.h | 13 ++++++--- 2 files changed, 49 insertions(+), 26 deletions(-) diff --git a/lib/base/workqueue.cpp b/lib/base/workqueue.cpp index ea5704198..6a6d25b24 100644 --- a/lib/base/workqueue.cpp +++ b/lib/base/workqueue.cpp @@ -19,17 +19,26 @@ #include "base/workqueue.h" #include "base/utility.h" +#include "base/debug.h" +#include "base/logger_fwd.h" #include +#include using namespace icinga; -WorkQueue::WorkQueue(void) - : m_Executing(false) -{ } +int WorkQueue::m_NextID = 1; + +WorkQueue::WorkQueue(size_t maxItems) + : m_ID(m_NextID++), m_MaxItems(maxItems), m_Joined(false), m_Stopped(false) +{ + m_Thread = boost::thread(boost::bind(&WorkQueue::WorkerThreadProc, this)); +} WorkQueue::~WorkQueue(void) { Join(); + + ASSERT(m_Stopped); } /** @@ -40,34 +49,38 @@ void WorkQueue::Enqueue(const WorkCallback& item) { boost::mutex::scoped_lock lock(m_Mutex); + ASSERT(m_Stopped); + + while (m_Items.size() >= m_MaxItems) + m_CV.wait(lock); + m_Items.push_back(item); m_CV.notify_all(); - - if (!m_Executing) { - m_Executing = true; - Utility::QueueAsyncCallback(boost::bind(&WorkQueue::ExecuteItem, this)); - } } void WorkQueue::Join(void) { boost::mutex::scoped_lock lock(m_Mutex); - while (m_Executing || !m_Items.empty()) + m_Joined = true; + while (!m_Stopped) m_CV.wait(lock); } -void WorkQueue::Clear(void) +void WorkQueue::WorkerThreadProc(void) { boost::mutex::scoped_lock lock(m_Mutex); - m_Items.clear(); - m_CV.notify_all(); -} -void WorkQueue::ExecuteItem(void) -{ - boost::mutex::scoped_lock lock(m_Mutex); + std::ostringstream idbuf; + idbuf << "WQ #" << m_ID; + Utility::SetThreadName(idbuf.str()); + + for (;;) { + while (m_Items.empty() && !m_Joined) + m_CV.wait(lock); + + if (m_Joined) + break; - while (!m_Items.empty()) { try { WorkCallback wi = m_Items.front(); m_Items.pop_front(); @@ -75,14 +88,19 @@ void WorkQueue::ExecuteItem(void) lock.unlock(); wi(); - lock.lock(); + } catch (const std::exception& ex) { + std::ostringstream msgbuf; + msgbuf << "Exception thrown in workqueue handler: " << std::endl + << boost::diagnostic_information(ex); + + Log(LogCritical, "base", msgbuf.str()); } catch (...) { - lock.lock(); - m_Executing = false; - throw; + Log(LogCritical, "base", "Exception of unknown type thrown in workqueue handler."); } + + lock.lock(); } - m_Executing = false; + m_Stopped = true; m_CV.notify_all(); } diff --git a/lib/base/workqueue.h b/lib/base/workqueue.h index 4be814fe2..b1b4a2ccb 100644 --- a/lib/base/workqueue.h +++ b/lib/base/workqueue.h @@ -23,6 +23,7 @@ #include "base/i2-base.h" #include #include +#include #include #include @@ -39,21 +40,25 @@ class I2_BASE_API WorkQueue public: typedef boost::function WorkCallback; - WorkQueue(void); + WorkQueue(size_t maxItems = 25000); ~WorkQueue(void); void Enqueue(const WorkCallback& item); void Join(void); - void Clear(void); private: + int m_ID; + static int m_NextID; + boost::mutex m_Mutex; boost::condition_variable m_CV; + boost::thread m_Thread; size_t m_MaxItems; - bool m_Executing; + bool m_Joined; + bool m_Stopped; std::deque m_Items; - void ExecuteItem(void); + void WorkerThreadProc(void); }; } -- 2.40.0