]> granicus.if.org Git - icinga2/commitdiff
Re-implement WorkQueue item limit.
authorGunnar Beutner <gunnar.beutner@netways.de>
Thu, 17 Oct 2013 08:19:17 +0000 (10:19 +0200)
committerGunnar Beutner <gunnar.beutner@netways.de>
Thu, 17 Oct 2013 08:19:17 +0000 (10:19 +0200)
lib/base/workqueue.cpp
lib/base/workqueue.h

index ea5704198cbff13ce69a296932034bb2f28a43bd..6a6d25b24d679d7ebbb7df14a32f390709dd77b4 100644 (file)
 
 #include "base/workqueue.h"
 #include "base/utility.h"
+#include "base/debug.h"
+#include "base/logger_fwd.h"
 #include <boost/bind.hpp>
+#include <boost/exception/diagnostic_information.hpp>
 
 using namespace icinga;
 
-WorkQueue::WorkQueue(void)
-       : m_Executing(false)
-{ }
+int WorkQueue::m_NextID = 1;
+
+WorkQueue::WorkQueue(size_t maxItems)
+       : m_ID(m_NextID++), m_MaxItems(maxItems), m_Joined(false), m_Stopped(false)
+{
+       m_Thread = boost::thread(boost::bind(&WorkQueue::WorkerThreadProc, this));
+}
 
 WorkQueue::~WorkQueue(void)
 {
        Join();
+
+       ASSERT(m_Stopped);
 }
 
 /**
@@ -40,34 +49,38 @@ void WorkQueue::Enqueue(const WorkCallback& item)
 {
        boost::mutex::scoped_lock lock(m_Mutex);
 
+       ASSERT(m_Stopped);
+
+       while (m_Items.size() >= m_MaxItems)
+               m_CV.wait(lock);
+
        m_Items.push_back(item);
        m_CV.notify_all();
-
-       if (!m_Executing) {
-               m_Executing = true;
-               Utility::QueueAsyncCallback(boost::bind(&WorkQueue::ExecuteItem, this));
-       }
 }
 
 void WorkQueue::Join(void)
 {
        boost::mutex::scoped_lock lock(m_Mutex);
-       while (m_Executing || !m_Items.empty())
+       m_Joined = true;
+       while (!m_Stopped)
                m_CV.wait(lock);
 }
 
-void WorkQueue::Clear(void)
+void WorkQueue::WorkerThreadProc(void)
 {
        boost::mutex::scoped_lock lock(m_Mutex);
-       m_Items.clear();
-       m_CV.notify_all();
-}
 
-void WorkQueue::ExecuteItem(void)
-{
-       boost::mutex::scoped_lock lock(m_Mutex);
+       std::ostringstream idbuf;
+       idbuf << "WQ #" << m_ID;
+       Utility::SetThreadName(idbuf.str());
+
+       for (;;) {
+               while (m_Items.empty() && !m_Joined)
+                       m_CV.wait(lock);
+
+               if (m_Joined)
+                       break;
 
-       while (!m_Items.empty()) {
                try {
                        WorkCallback wi = m_Items.front();
                        m_Items.pop_front();
@@ -75,14 +88,19 @@ void WorkQueue::ExecuteItem(void)
 
                        lock.unlock();
                        wi();
-                       lock.lock();
+               } catch (const std::exception& ex) {
+                       std::ostringstream msgbuf;
+                       msgbuf << "Exception thrown in workqueue handler: " << std::endl
+                              << boost::diagnostic_information(ex);
+
+                       Log(LogCritical, "base", msgbuf.str());
                } catch (...) {
-                       lock.lock();
-                       m_Executing = false;
-                       throw;
+                       Log(LogCritical, "base", "Exception of unknown type thrown in workqueue handler.");
                }
+
+               lock.lock();
        }
 
-       m_Executing = false;
+       m_Stopped = true;
        m_CV.notify_all();
 }
index 4be814fe26c7167dae0f18302a38fde0239f4246..b1b4a2ccb7b3c34803e920b03fb15dd9ab345e32 100644 (file)
@@ -23,6 +23,7 @@
 #include "base/i2-base.h"
 #include <deque>
 #include <boost/function.hpp>
+#include <boost/thread/thread.hpp>
 #include <boost/thread/mutex.hpp>
 #include <boost/thread/condition_variable.hpp>
 
@@ -39,21 +40,25 @@ class I2_BASE_API WorkQueue
 public:
        typedef boost::function<void (void)> WorkCallback;
 
-       WorkQueue(void);
+       WorkQueue(size_t maxItems = 25000);
        ~WorkQueue(void);
 
        void Enqueue(const WorkCallback& item);
        void Join(void);
-       void Clear(void);
 
 private:
+       int m_ID;
+       static int m_NextID;
+
        boost::mutex m_Mutex;
        boost::condition_variable m_CV;
+       boost::thread m_Thread;
        size_t m_MaxItems;
-       bool m_Executing;
+       bool m_Joined;
+       bool m_Stopped;
        std::deque<WorkCallback> m_Items;
 
-       void ExecuteItem(void);
+       void WorkerThreadProc(void);
 };
 
 }