]> granicus.if.org Git - icinga2/commitdiff
Improve WorkQueue performance
authorGunnar Beutner <gunnar@beutner.name>
Thu, 12 Dec 2013 05:30:11 +0000 (06:30 +0100)
committerGunnar Beutner <gunnar.beutner@netways.de>
Fri, 13 Dec 2013 13:08:06 +0000 (14:08 +0100)
Refs #5327

lib/base/workqueue.cpp
lib/base/workqueue.h
lib/config/configitem.cpp
lib/config/configitem.h
lib/icinga/host.cpp
lib/icinga/service-downtime.cpp
lib/icinga/service-notification.cpp
lib/icinga/service.cpp
lib/icinga/service.h

index 6ed2d6c3a7d3166dfd8c9c377d33cbc5168bfa72..4e5ef9f793bb3ac4990d85e7411ae3cf3193f22c 100644 (file)
@@ -23,6 +23,7 @@
 #include "base/logger_fwd.h"
 #include "base/convert.h"
 #include <boost/bind.hpp>
+#include <boost/foreach.hpp>
 
 using namespace icinga;
 
@@ -30,7 +31,7 @@ int WorkQueue::m_NextID = 1;
 
 WorkQueue::WorkQueue(size_t maxItems)
        : m_ID(m_NextID++), m_MaxItems(maxItems), m_Stopped(false),
-         m_ExceptionCallback(WorkQueue::DefaultExceptionCallback)
+         m_Processing(false), m_ExceptionCallback(WorkQueue::DefaultExceptionCallback)
 {
        m_Thread = boost::thread(boost::bind(&WorkQueue::WorkerThreadProc, this));
 
@@ -47,16 +48,24 @@ WorkQueue::~WorkQueue(void)
 
 /**
  * Enqueues a work item. Work items are guaranteed to be executed in the order
- * they were enqueued in.
+ * they were enqueued in except when allowInterleaved is true in which case
+ * the new work item might be run immediately if it's being enqueued from
+ * within the WorkQueue thread.
  */
 void WorkQueue::Enqueue(const WorkCallback& callback, bool allowInterleaved)
 {
+       bool wq_thread = (boost::this_thread::get_id() == GetThreadId());
+
+       if (wq_thread && allowInterleaved) {
+               callback();
+
+               return;
+       }
+
        WorkItem item;
        item.Callback = callback;
        item.AllowInterleaved = allowInterleaved;
 
-       bool wq_thread = (boost::this_thread::get_id() == GetThreadId());
-
        boost::mutex::scoped_lock lock(m_Mutex);
 
        if (!wq_thread) {
@@ -66,9 +75,7 @@ void WorkQueue::Enqueue(const WorkCallback& callback, bool allowInterleaved)
 
        m_Items.push_back(item);
 
-       if (wq_thread)
-               ProcessItems(lock, true);
-       else if (m_Items.size() == 1)
+       if (m_Items.size() == 1)
                m_CVEmpty.notify_all();
 }
 
@@ -76,7 +83,7 @@ void WorkQueue::Join(bool stop)
 {
        boost::mutex::scoped_lock lock(m_Mutex);
 
-       while (!m_Items.empty())
+       while (m_Processing || !m_Items.empty())
                m_CVStarved.wait(lock);
 
        if (stop) {
@@ -112,59 +119,54 @@ void WorkQueue::StatusTimerHandler(void)
        Log(LogInformation, "base", "WQ #" + Convert::ToString(m_ID) + " items: " + Convert::ToString(m_Items.size()));
 }
 
-void WorkQueue::ProcessItems(boost::mutex::scoped_lock& lock, bool interleaved)
+void WorkQueue::WorkerThreadProc(void)
 {
-       while (!m_Items.empty()) {
-               WorkItem wi = m_Items.front();
-
-               if (interleaved && !wi.AllowInterleaved)
-                       return;
+       std::ostringstream idbuf;
+       idbuf << "WQ #" << m_ID;
+       Utility::SetThreadName(idbuf.str());
 
-               lock.unlock();
+       boost::mutex::scoped_lock lock(m_Mutex);
 
-               try {
-                       wi.Callback();
-               } catch (const std::exception&) {
-                       lock.lock();
+       for (;;) {
+               while (m_Items.empty() && !m_Stopped)
+                       m_CVEmpty.wait(lock);
 
-                       ExceptionCallback callback = m_ExceptionCallback;
+               if (m_Stopped)
+                       break;
 
-                       lock.unlock();
+               std::deque<WorkItem> items;
+               m_Items.swap(items);
 
-                       callback(boost::current_exception());
-               }
+               if (items.size() >= m_MaxItems)
+                       m_CVFull.notify_all();
 
-               lock.lock();
+               m_Processing = true;
 
-               m_Items.pop_front();
+               lock.unlock();
 
-               if (m_Items.size() + 1 == m_MaxItems)
-                       m_CVFull.notify_one();
-       }
+               BOOST_FOREACH(WorkItem& wi, items) {
+                       try {
+                               wi.Callback();
+                       }
+                       catch (const std::exception&) {
+                               lock.lock();
 
-       m_CVStarved.notify_all();
-}
+                               ExceptionCallback callback = m_ExceptionCallback;
 
-void WorkQueue::WorkerThreadProc(void)
-{
-       boost::mutex::scoped_lock lock(m_Mutex);
+                               lock.unlock();
 
-       std::ostringstream idbuf;
-       idbuf << "WQ #" << m_ID;
-       Utility::SetThreadName(idbuf.str());
+                               callback(boost::current_exception());
+                       }
+               }
 
-       for (;;) {
-               while (m_Items.empty() && !m_Stopped)
-                       m_CVEmpty.wait(lock);
+               lock.lock();
 
-               if (m_Stopped)
-                       break;
+               m_Processing = false;
 
-               ProcessItems(lock, false);
+               m_CVStarved.notify_all();
        }
 }
 
-
 ParallelWorkQueue::ParallelWorkQueue(void)
        : m_QueueCount(boost::thread::hardware_concurrency()),
          m_Queues(new WorkQueue[m_QueueCount]),
index 4e92d2b2751ee922d5fa2341122537b018f93965..10015ff5a9ec47d5db5bcf85f95fb9a22b66d1c0 100644 (file)
@@ -36,7 +36,6 @@ typedef boost::function<void (void)> WorkCallback;
 
 struct WorkItem
 {
-
        WorkCallback Callback;
        bool AllowInterleaved;
 };
@@ -72,11 +71,11 @@ private:
        boost::thread m_Thread;
        size_t m_MaxItems;
        bool m_Stopped;
+       bool m_Processing;
        std::deque<WorkItem> m_Items;
        ExceptionCallback m_ExceptionCallback;
        Timer::Ptr m_StatusTimer;
 
-       void ProcessItems(boost::mutex::scoped_lock& lock, bool interleaved);
        void WorkerThreadProc(void);
        void StatusTimerHandler(void);
 
index 7b03b805a90a0158bd938d6baf45de0babb7e43e..6d2a865611c5cc5071b0ed41cc994aa04815b50b 100644 (file)
@@ -104,7 +104,7 @@ ExpressionList::Ptr ConfigItem::GetExpressionList(void) const
 
 void ConfigItem::Link(void)
 {
-       ObjectLock olock(this);
+       ASSERT(OwnsLock());
 
        if (m_LinkedExpressionList)
                return;
@@ -132,6 +132,8 @@ void ConfigItem::Link(void)
 
 ExpressionList::Ptr ConfigItem::GetLinkedExpressionList(void)
 {
+       ASSERT(OwnsLock());
+
        if (!m_LinkedExpressionList)
                Link();
 
@@ -140,6 +142,10 @@ ExpressionList::Ptr ConfigItem::GetLinkedExpressionList(void)
 
 Dictionary::Ptr ConfigItem::GetProperties(void)
 {
+       ASSERT(!OwnsLock());
+
+       ObjectLock olock(this);
+
        if (!m_Properties) {
                m_Properties = make_shared<Dictionary>();
                GetLinkedExpressionList()->Execute(m_Properties);
@@ -184,19 +190,17 @@ DynamicObject::Ptr ConfigItem::Commit(void)
        return dobj;
 }
 
-DynamicObject::Ptr ConfigItem::GetObject(void) const
-{
-       return m_Object;
-}
-
 /**
  * Registers the configuration item.
  */
 void ConfigItem::Register(void)
 {
+       std::pair<String, String> key = std::make_pair(m_Type, m_Name);
+       ConfigItem::Ptr self = GetSelf();
+
        boost::mutex::scoped_lock lock(m_Mutex);
 
-       m_Items[std::make_pair(m_Type, m_Name)] = GetSelf();
+       m_Items[key] = self;
 }
 
 /**
@@ -208,11 +212,14 @@ void ConfigItem::Register(void)
  */
 ConfigItem::Ptr ConfigItem::GetObject(const String& type, const String& name)
 {
-       boost::mutex::scoped_lock lock(m_Mutex);
-
+       std::pair<String, String> key = std::make_pair(type, name);
        ConfigItem::ItemMap::iterator it;
 
-       it = m_Items.find(std::make_pair(type, name));
+       {
+               boost::mutex::scoped_lock lock(m_Mutex);
+
+               it = m_Items.find(key);
+       }
 
        if (it != m_Items.end())
                return it->second;
@@ -222,11 +229,14 @@ ConfigItem::Ptr ConfigItem::GetObject(const String& type, const String& name)
 
 bool ConfigItem::HasObject(const String& type, const String& name)
 {
-       boost::mutex::scoped_lock lock(m_Mutex);
-
+       std::pair<String, String> key = std::make_pair(type, name);
        ConfigItem::ItemMap::iterator it;
 
-       it = m_Items.find(std::make_pair(type, name));
+       {
+               boost::mutex::scoped_lock lock(m_Mutex);
+
+               it = m_Items.find(key);
+       }
 
        return (it != m_Items.end());
 }
@@ -280,7 +290,7 @@ bool ConfigItem::ActivateItems(bool validateOnly)
        
        std::vector<DynamicObject::Ptr> objects;
        BOOST_FOREACH(const ItemMap::value_type& kv, m_Items) {
-               DynamicObject::Ptr object = kv.second->GetObject();
+               DynamicObject::Ptr object = kv.second->m_Object;
 
                if (object)
                        objects.push_back(object);
@@ -342,5 +352,7 @@ bool ConfigItem::ActivateItems(bool validateOnly)
 
 void ConfigItem::DiscardItems(void)
 {
+       boost::mutex::scoped_lock lock(m_Mutex);
+
        m_Items.clear();
 }
index 8adf4b6b816fe38799a856d6ec6b6a286e384eb0..bd69daff4a64150be6829a57fe7c4ee8b6b9e652 100644 (file)
@@ -47,7 +47,6 @@ public:
 
        std::vector<ConfigItem::Ptr> GetParents(void) const;
 
-       void Link(void);
        ExpressionList::Ptr GetLinkedExpressionList(void);
        Dictionary::Ptr GetProperties(void);
 
@@ -61,13 +60,12 @@ public:
        static bool HasObject(const String& type, const String& name);
 
        void ValidateItem(void);
-
-        DynamicObject::Ptr GetObject(void) const;
         
        static bool ActivateItems(bool validateOnly);
        static void DiscardItems(void);
 
 private:
+       void Link(void);
        ExpressionList::Ptr GetExpressionList(void) const;
 
        String m_Type; /**< The object type. */
index f38eea9c7fbdb7afd0824f74a557fc1d5dad65f0..adf476771cd0deb457c096906ccf822e9743d4dd 100644 (file)
@@ -138,17 +138,13 @@ void Host::UpdateSlaveServices(void)
 {
        ASSERT(!OwnsLock());
 
-       ConfigItem::Ptr item = ConfigItem::GetObject("Host", GetName());
-
-       /* Don't create slave services unless we own this object */
-       if (!item)
-               return;
-
        Dictionary::Ptr service_descriptions = GetServiceDescriptions();
 
-       if (!service_descriptions)
+       if (!service_descriptions ||service_descriptions->GetLength() == 0)
                return;
 
+       ConfigItem::Ptr item = ConfigItem::GetObject("Host", GetName());
+
        ObjectLock olock(service_descriptions);
        BOOST_FOREACH(const Dictionary::Pair& kv, service_descriptions) {
                std::ostringstream namebuf;
index 9596c56a5c0fcf6fb23804c9fc883450902c13c4..793452799a13418d7ae56103eec0a276c739b6dc 100644 (file)
@@ -221,12 +221,10 @@ Downtime::Ptr Service::GetDowntimeByID(const String& id)
 
 void Service::StartDowntimesExpiredTimer(void)
 {
-        if (!l_DowntimesExpireTimer) {
-               l_DowntimesExpireTimer = make_shared<Timer>();
-               l_DowntimesExpireTimer->SetInterval(60);
-               l_DowntimesExpireTimer->OnTimerExpired.connect(boost::bind(&Service::DowntimesExpireTimerHandler));
-               l_DowntimesExpireTimer->Start();
-        }
+       l_DowntimesExpireTimer = make_shared<Timer>();
+       l_DowntimesExpireTimer->SetInterval(60);
+       l_DowntimesExpireTimer->OnTimerExpired.connect(boost::bind(&Service::DowntimesExpireTimerHandler));
+       l_DowntimesExpireTimer->Start();
 }
 
 void Service::AddDowntimesToCache(void)
@@ -318,18 +316,14 @@ int Service::GetDowntimeDepth(void) const
 
 void Service::UpdateSlaveScheduledDowntimes(void)
 {
-       ConfigItem::Ptr item = ConfigItem::GetObject("Service", GetName());
-
-       /* Don't create slave scheduled downtimes unless we own this object */
-       if (!item)
-               return;
-
        /* Service scheduled downtime descs */
        Dictionary::Ptr descs = GetScheduledDowntimeDescriptions();
 
-       if (!descs)
+       if (!descs || descs->GetLength() == 0)
                return;
 
+       ConfigItem::Ptr item = ConfigItem::GetObject("Service", GetName());
+
        ObjectLock olock(descs);
 
        BOOST_FOREACH(const Dictionary::Pair& kv, descs) {
index ac176fa751fc51c7e610cda92bc7953008ac85ee..b4808659c1ca80f247ef9404e73d211ffbf6ba1b 100644 (file)
@@ -95,18 +95,14 @@ void Service::RemoveNotification(const Notification::Ptr& notification)
 
 void Service::UpdateSlaveNotifications(void)
 {
-       ConfigItem::Ptr item = ConfigItem::GetObject("Service", GetName());
-
-       /* Don't create slave notifications unless we own this object */
-       if (!item)
-               return;
-
        /* Service notification descs */
        Dictionary::Ptr descs = GetNotificationDescriptions();
 
-       if (!descs)
+       if (!descs || descs->GetLength() == 0)
                return;
 
+       ConfigItem::Ptr item = ConfigItem::GetObject("Service", GetName());
+
        ObjectLock olock(descs);
 
        BOOST_FOREACH(const Dictionary::Pair& kv, descs) {
index 482892758920a08ca92a59b6fdf68f0452ac3779..be50f34f9c1fcf2f2945ceb12b7c788cb1feb728 100644 (file)
@@ -28,6 +28,7 @@
 #include "base/objectlock.h"
 #include "base/convert.h"
 #include "base/utility.h"
+#include "base/initialize.h"
 #include <boost/foreach.hpp>
 #include <boost/bind/apply.hpp>
 
@@ -38,16 +39,14 @@ REGISTER_TYPE(Service);
 boost::signals2::signal<void (const Service::Ptr&, const String&, const String&, AcknowledgementType, double, const String&)> Service::OnAcknowledgementSet;
 boost::signals2::signal<void (const Service::Ptr&, const String&)> Service::OnAcknowledgementCleared;
 
+INITIALIZE_ONCE(&Service::StartDowntimesExpiredTimer);
+
 Service::Service(void)
        : m_CheckRunning(false)
 { }
 
 void Service::Start(void)
 {
-       VERIFY(GetHost());
-
-       StartDowntimesExpiredTimer();
-
        double now = Utility::GetTime();
 
        if (GetNextCheck() < now + 300)
index c6cd6b8c9b7b88fd67fba5475b03aaf90956641f..883368751bc6c9cf8e750ddadcd9a3297e0fa3cb 100644 (file)
@@ -204,7 +204,7 @@ public:
        static Service::Ptr GetOwnerByDowntimeID(const String& id);
        static Downtime::Ptr GetDowntimeByID(const String& id);
 
-       void StartDowntimesExpiredTimer(void);
+       static void StartDowntimesExpiredTimer(void);
 
        bool IsInDowntime(void) const;
        bool IsAcknowledged(void);