]> granicus.if.org Git - icinga2/commitdiff
Fix WorkQueue lock contention.
authorGunnar Beutner <gunnar@beutner.name>
Fri, 29 Nov 2013 16:07:04 +0000 (17:07 +0100)
committerGunnar Beutner <gunnar.beutner@netways.de>
Mon, 2 Dec 2013 07:38:53 +0000 (08:38 +0100)
Fixes #5245

lib/base/workqueue.cpp
lib/base/workqueue.h

index 67e51c397c67a5fd64155d2f0f3a8226cfdeef7b..f84b753c8abec0e35ce3c2346be8a0ce19d464ed 100644 (file)
@@ -30,10 +30,14 @@ int WorkQueue::m_NextID = 1;
 
 WorkQueue::WorkQueue(size_t maxItems)
        : m_ID(m_NextID++), m_MaxItems(maxItems), m_Joined(false),
-         m_Stopped(false), m_ExceptionCallback(WorkQueue::DefaultExceptionCallback),
-         m_LastStatus(0)
+         m_Stopped(false), m_ExceptionCallback(WorkQueue::DefaultExceptionCallback)
 {
        m_Thread = boost::thread(boost::bind(&WorkQueue::WorkerThreadProc, this));
+
+       m_StatusTimer = make_shared<Timer>();
+       m_StatusTimer->SetInterval(10);
+       m_StatusTimer->OnTimerExpired.connect(boost::bind(&WorkQueue::StatusTimerHandler, this));
+       m_StatusTimer->Start();
 }
 
 WorkQueue::~WorkQueue(void)
@@ -49,37 +53,37 @@ WorkQueue::~WorkQueue(void)
  */
 void WorkQueue::Enqueue(const WorkCallback& callback, bool allowInterleaved)
 {
+       WorkItem item;
+       item.Callback = callback;
+       item.AllowInterleaved = allowInterleaved;
+
+       bool wq_thread = (boost::this_thread::get_id() == GetThreadId());
+
        boost::mutex::scoped_lock lock(m_Mutex);
 
        ASSERT(!m_Stopped);
 
-       bool wq_thread = (boost::this_thread::get_id() == GetThreadId());
-
        if (!wq_thread) {
                while (m_Items.size() >= m_MaxItems)
-                       m_CV.wait(lock);
+                       m_CVFull.wait(lock);
        }
 
-       WorkItem item;
-       item.Callback = callback;
-       item.AllowInterleaved = allowInterleaved;
-
        m_Items.push_back(item);
 
        if (wq_thread)
                ProcessItems(lock, true);
        else
-               m_CV.notify_all();
+               m_CVEmpty.notify_one();
 }
 
 void WorkQueue::Join(void)
 {
        boost::mutex::scoped_lock lock(m_Mutex);
        m_Joined = true;
-       m_CV.notify_all();
+       m_CVEmpty.notify_all();
 
        while (!m_Stopped)
-               m_CV.wait(lock);
+               m_CVFull.wait(lock);
 }
 
 boost::thread::id WorkQueue::GetThreadId(void) const
@@ -99,26 +103,27 @@ void WorkQueue::DefaultExceptionCallback(boost::exception_ptr exp)
        throw;
 }
 
+void WorkQueue::StatusTimerHandler(void)
+{
+       boost::mutex::scoped_lock lock(m_Mutex);
+
+       Log(LogInformation, "base", "WQ #" + Convert::ToString(m_ID) + " items: " + Convert::ToString(m_Items.size()));
+}
+
 void WorkQueue::ProcessItems(boost::mutex::scoped_lock& lock, bool interleaved)
 {
        while (!m_Items.empty()) {
-               try {
-                       WorkItem wi = m_Items.front();
-
-                       if (interleaved && !wi.AllowInterleaved)
-                               return;
+               WorkItem wi = m_Items.front();
 
-                       m_Items.pop_front();
-                       m_CV.notify_all();
+               if (interleaved && !wi.AllowInterleaved)
+                       return;
 
-                       double now = Utility::GetTime();
+               m_Items.pop_front();
+               m_CVFull.notify_all();
 
-                       if (m_LastStatus + 10 < now) {
-                               Log(LogInformation, "base", "WQ items: " + Convert::ToString(m_Items.size()));
-                               m_LastStatus = now;
-                       }
+               lock.unlock();
 
-                       lock.unlock();
+               try {
                        wi.Callback();
                } catch (const std::exception& ex) {
                        lock.lock();
@@ -144,7 +149,7 @@ void WorkQueue::WorkerThreadProc(void)
 
        for (;;) {
                while (m_Items.empty() && !m_Joined)
-                       m_CV.wait(lock);
+                       m_CVEmpty.wait(lock);
 
                if (m_Joined)
                        break;
@@ -153,5 +158,5 @@ void WorkQueue::WorkerThreadProc(void)
        }
 
        m_Stopped = true;
-       m_CV.notify_all();
+       m_CVFull.notify_all();
 }
index 5cfce709f4ca4aaeb9d3c4ea05dcbf3322426a1b..56b2c8df628a69905c1bfd34e979767ab7004504 100644 (file)
@@ -21,6 +21,7 @@
 #define WORKQUEUE_H
 
 #include "base/i2-base.h"
+#include "base/timer.h"
 #include <deque>
 #include <boost/function.hpp>
 #include <boost/thread/thread.hpp>
@@ -50,7 +51,7 @@ class I2_BASE_API WorkQueue
 public:
        typedef boost::function<void (boost::exception_ptr)> ExceptionCallback;
 
-       WorkQueue(size_t maxItems = 2500000);
+       WorkQueue(size_t maxItems = 25000);
        ~WorkQueue(void);
 
        void Enqueue(const WorkCallback& callback, bool allowInterleaved = false);
@@ -65,17 +66,19 @@ private:
        static int m_NextID;
 
        boost::mutex m_Mutex;
-       boost::condition_variable m_CV;
+       boost::condition_variable m_CVEmpty;
+       boost::condition_variable m_CVFull;
        boost::thread m_Thread;
        size_t m_MaxItems;
        bool m_Joined;
        bool m_Stopped;
        std::deque<WorkItem> m_Items;
        ExceptionCallback m_ExceptionCallback;
-       double m_LastStatus;
+       Timer::Ptr m_StatusTimer;
 
        void ProcessItems(boost::mutex::scoped_lock& lock, bool interleaved);
        void WorkerThreadProc(void);
+       void StatusTimerHandler(void);
 
        static void DefaultExceptionCallback(boost::exception_ptr exp);
 };