]> granicus.if.org Git - icinga2/commitdiff
Implement support for priorities in the WorkQueue class
authorGunnar Beutner <gunnar@beutner.name>
Thu, 10 Dec 2015 15:54:43 +0000 (16:54 +0100)
committerGunnar Beutner <gunnar@beutner.name>
Thu, 10 Dec 2015 16:06:00 +0000 (17:06 +0100)
fixes #8714

lib/base/configobject.cpp
lib/base/workqueue.cpp
lib/base/workqueue.hpp
lib/db_ido/dbconnection.cpp
lib/db_ido/dbquery.hpp
lib/db_ido_mysql/idomysqlconnection.cpp
lib/db_ido_pgsql/idopgsqlconnection.cpp
lib/remote/apilistener.cpp

index 00b4af1bbc93cff117ca15bf36c337cfa2bcb6b7..fbf1d95fd3bbe3e341525332098f34fa569bf01d 100644 (file)
@@ -415,8 +415,6 @@ void ConfigObject::Deactivate(bool runtimeRemoved)
 {
        CONTEXT("Deactivating object '" + GetName() + "' of type '" + GetType()->GetName() + "'");
 
-       SetAuthority(false);
-
        {
                ObjectLock olock(this);
 
@@ -426,6 +424,8 @@ void ConfigObject::Deactivate(bool runtimeRemoved)
                SetActive(false, true);
        }
 
+       SetAuthority(false);
+
        Stop(runtimeRemoved);
 
        ASSERT(GetStopCalled());
@@ -471,10 +471,10 @@ void ConfigObject::SetAuthority(bool authority)
                ASSERT(GetResumeCalled());
                SetPaused(false);
        } else if (!authority && !GetPaused()) {
+               SetPaused(true);
                SetPauseCalled(false);
                Pause();
                ASSERT(GetPauseCalled());
-               SetPaused(true);
        }
 }
 
index 87200656c0de5ffe16e6216a7d8be34eae3b387c..36d9a32e213cf47a72e2b24e6cb983dff9d30c4f 100644 (file)
@@ -55,12 +55,13 @@ WorkQueue::~WorkQueue(void)
  * allowInterleaved is true in which case the new task might be run
  * immediately if it's being enqueued from within the WorkQueue thread.
  */
-void WorkQueue::Enqueue(const Task& task, bool allowInterleaved)
+void WorkQueue::Enqueue(const boost::function<void (void)>& function, WorkQueuePriority priority,
+    bool allowInterleaved)
 {
        bool wq_thread = IsWorkerThread();
 
        if (wq_thread && allowInterleaved) {
-               task();
+               function();
 
                return;
        }
@@ -80,7 +81,7 @@ void WorkQueue::Enqueue(const Task& task, bool allowInterleaved)
                        m_CVFull.wait(lock);
        }
 
-       m_Tasks.push_back(task);
+       m_Tasks.push(Task(function, priority));
 
        m_CVEmpty.notify_one();
 }
@@ -200,15 +201,15 @@ void WorkQueue::WorkerThreadProc(void)
                if (m_Tasks.size() >= m_MaxItems && m_MaxItems != 0)
                        m_CVFull.notify_all();
 
-               Task task = m_Tasks.front();
-               m_Tasks.pop_front();
+               Task task = m_Tasks.top();
+               m_Tasks.pop();
 
                m_Processing++;
 
                lock.unlock();
 
                try {
-                       task();
+                       task.Function();
                } catch (const std::exception&) {
                        lock.lock();
 
index c6088b74efd4130022f7b99e9b991df0c49478a1..75d93e1c3dbda8dce551c64ddea7c296d3874f42 100644 (file)
 #include <boost/thread/mutex.hpp>
 #include <boost/thread/condition_variable.hpp>
 #include <boost/exception_ptr.hpp>
+#include <queue>
 #include <deque>
 
 namespace icinga
 {
 
-typedef boost::function<void (void)> Task;
+enum WorkQueuePriority
+{
+       PriorityLow,
+       PriorityNormal,
+       PriorityHigh
+};
+
+struct Task
+{
+       Task(void)
+           : Priority(PriorityNormal)
+       { }
+
+       Task(const boost::function<void (void)>& function, WorkQueuePriority priority)
+           : Function(function), Priority(priority)
+       { }
+
+       boost::function<void (void)> Function;
+       WorkQueuePriority Priority;
+};
+
+inline bool operator<(const Task& a, const Task& b)
+{
+       return a.Priority < b.Priority;
+}
 
 /**
  * A workqueue.
@@ -47,7 +72,8 @@ public:
        WorkQueue(size_t maxItems = 0, int threadCount = 1);
        ~WorkQueue(void);
 
-       void Enqueue(const Task& task, bool allowInterleaved = false);
+       void Enqueue(const boost::function<void (void)>& function, WorkQueuePriority priority = PriorityNormal,
+           bool allowInterleaved = false);
        void Join(bool stop = false);
 
        bool IsWorkerThread(void) const;
@@ -74,7 +100,7 @@ private:
        size_t m_MaxItems;
        bool m_Stopped;
        int m_Processing;
-       std::deque<Task> m_Tasks;
+       std::priority_queue<Task, std::deque<Task> > m_Tasks;
        ExceptionCallback m_ExceptionCallback;
        std::vector<boost::exception_ptr> m_Exceptions;
        Timer::Ptr m_StatusTimer;
index 1bbcf71d8605b7a167ab8a3c6523eb5d8cbba02a..967815b6c46b48b4d29a8016eef41e40376cf271 100644 (file)
@@ -97,6 +97,9 @@ void DbConnection::Pause(void)
        query1.Fields = new Dictionary();
        query1.Fields->Set("instance_id", 0); /* DbConnection class fills in real ID */
        query1.Fields->Set("program_end_time", DbValue::FromTimestamp(Utility::GetTime()));
+
+       query1.Priority = PriorityHigh;
+
        ExecuteQuery(query1);
 
        NewTransaction();
@@ -134,6 +137,7 @@ void DbConnection::ProgramStatusHandler(void)
        query1.Category = DbCatProgramStatus;
        query1.WhereCriteria = new Dictionary();
        query1.WhereCriteria->Set("instance_id", 0);  /* DbConnection class fills in real ID */
+       query1.Priority = PriorityHigh;
        DbObject::OnQuery(query1);
 
        DbQuery query2;
@@ -160,6 +164,7 @@ void DbConnection::ProgramStatusHandler(void)
        query2.Fields->Set("event_handlers_enabled", (IcingaApplication::GetInstance()->GetEnableEventHandlers() ? 1 : 0));
        query2.Fields->Set("flap_detection_enabled", (IcingaApplication::GetInstance()->GetEnableFlapping() ? 1 : 0));
        query2.Fields->Set("process_performance_data", (IcingaApplication::GetInstance()->GetEnablePerfdata() ? 1 : 0));
+       query2.Priority = PriorityHigh;
        DbObject::OnQuery(query2);
 
        DbQuery query3;
index dcab8771bd0b23a6ec47b3bfdbd0cf4bda5c27c3..60ce357367f4a14a0ce2f450175f2eb6af8a392f 100644 (file)
@@ -70,11 +70,12 @@ struct I2_DB_IDO_API DbQuery
        intrusive_ptr<CustomVarObject> NotificationObject;
        bool ConfigUpdate;
        bool StatusUpdate;
+       WorkQueuePriority Priority;
 
        static void StaticInitialize(void);
 
        DbQuery(void)
-               : Type(0), Category(DbCatInvalid), ConfigUpdate(false), StatusUpdate(false)
+               : Type(0), Category(DbCatInvalid), ConfigUpdate(false), StatusUpdate(false), Priority(PriorityLow)
        { }
 };
 
index e9ddea685bd7adc07ca8e774f45c1751fc6b04f4..dca7df260abc528fe339b20388f1d2203665bf70 100644 (file)
@@ -95,7 +95,7 @@ void IdoMysqlConnection::Pause(void)
 
        DbConnection::Pause();
 
-       m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::Disconnect, this));
+       m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::Disconnect, this), PriorityHigh);
        m_QueryQueue.Join();
 }
 
@@ -138,8 +138,8 @@ void IdoMysqlConnection::TxTimerHandler(void)
 
 void IdoMysqlConnection::NewTransaction(void)
 {
-       m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalNewTransaction, this));
-       m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::FinishAsyncQueries, this, true));
+       m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalNewTransaction, this), PriorityHigh);
+       m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::FinishAsyncQueries, this, true), PriorityHigh);
 }
 
 void IdoMysqlConnection::InternalNewTransaction(void)
@@ -155,13 +155,16 @@ void IdoMysqlConnection::InternalNewTransaction(void)
 
 void IdoMysqlConnection::ReconnectTimerHandler(void)
 {
-       m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::Reconnect, this));
+       m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::Reconnect, this), PriorityLow);
 }
 
 void IdoMysqlConnection::Reconnect(void)
 {
        AssertOnWorkQueue();
 
+       if (!IsActive())
+               return;
+
        CONTEXT("Reconnecting to MySQL IDO database '" + GetName() + "'");
 
        m_SessionToken = static_cast<int>(Utility::GetTime());
@@ -406,7 +409,7 @@ void IdoMysqlConnection::AsyncQuery(const String& query, const boost::function<v
        if (m_AsyncQueries.size() > 500)
                FinishAsyncQueries(true);
        else
-               m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::FinishAsyncQueries, this, false));
+               m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::FinishAsyncQueries, this, false), PriorityLow);
 }
 
 void IdoMysqlConnection::FinishAsyncQueries(bool force)
@@ -625,7 +628,7 @@ void IdoMysqlConnection::DiscardRows(const IdoMysqlResult& result)
 
 void IdoMysqlConnection::ActivateObject(const DbObject::Ptr& dbobj)
 {
-       m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalActivateObject, this, dbobj));
+       m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalActivateObject, this, dbobj), PriorityLow);
 }
 
 void IdoMysqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj)
@@ -659,7 +662,7 @@ void IdoMysqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj)
 
 void IdoMysqlConnection::DeactivateObject(const DbObject::Ptr& dbobj)
 {
-       m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalDeactivateObject, this, dbobj));
+       m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalDeactivateObject, this, dbobj), PriorityLow);
 }
 
 void IdoMysqlConnection::InternalDeactivateObject(const DbObject::Ptr& dbobj)
@@ -753,7 +756,7 @@ void IdoMysqlConnection::ExecuteQuery(const DbQuery& query)
 {
        ASSERT(query.Category != DbCatInvalid);
 
-       m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), true);
+       m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), query.Priority, true);
 }
 
 void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType *typeOverride)
@@ -781,7 +784,7 @@ void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType
 
                BOOST_FOREACH(const Dictionary::Pair& kv, query.WhereCriteria) {
                        if (!FieldToEscapedString(kv.first, kv.second, &value)) {
-                               m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL));
+                               m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), query.Priority);
                                return;
                        }
 
@@ -902,7 +905,7 @@ void IdoMysqlConnection::FinishExecuteQuery(const DbQuery& query, int type, bool
 
 void IdoMysqlConnection::CleanUpExecuteQuery(const String& table, const String& time_column, double max_age)
 {
-       m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age), true);
+       m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age), PriorityLow, true);
 }
 
 void IdoMysqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_column, double max_age)
index 1286717efb6f371753f61b0687420e82d6c39c43..c9bd87aad8b6449a1b73801c82923b427cd458e7 100644 (file)
@@ -97,7 +97,7 @@ void IdoPgsqlConnection::Pause(void)
 
        DbConnection::Pause();
 
-       m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::Disconnect, this));
+       m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::Disconnect, this), PriorityHigh);
        m_QueryQueue.Join();
 }
 
@@ -139,7 +139,7 @@ void IdoPgsqlConnection::TxTimerHandler(void)
 
 void IdoPgsqlConnection::NewTransaction(void)
 {
-       m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalNewTransaction, this), true);
+       m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalNewTransaction, this), PriorityHigh, true);
 }
 
 void IdoPgsqlConnection::InternalNewTransaction(void)
@@ -155,7 +155,7 @@ void IdoPgsqlConnection::InternalNewTransaction(void)
 
 void IdoPgsqlConnection::ReconnectTimerHandler(void)
 {
-       m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::Reconnect, this));
+       m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::Reconnect, this), PriorityLow);
 }
 
 void IdoPgsqlConnection::Reconnect(void)
@@ -503,7 +503,7 @@ Dictionary::Ptr IdoPgsqlConnection::FetchRow(const IdoPgsqlResult& result, int r
 
 void IdoPgsqlConnection::ActivateObject(const DbObject::Ptr& dbobj)
 {
-       m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalActivateObject, this, dbobj));
+       m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalActivateObject, this, dbobj), PriorityLow);
 }
 
 void IdoPgsqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj)
@@ -537,7 +537,7 @@ void IdoPgsqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj)
 
 void IdoPgsqlConnection::DeactivateObject(const DbObject::Ptr& dbobj)
 {
-       m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalDeactivateObject, this, dbobj));
+       m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalDeactivateObject, this, dbobj), PriorityLow);
 }
 
 void IdoPgsqlConnection::InternalDeactivateObject(const DbObject::Ptr& dbobj)
@@ -630,7 +630,7 @@ void IdoPgsqlConnection::ExecuteQuery(const DbQuery& query)
 {
        ASSERT(query.Category != DbCatInvalid);
 
-       m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), true);
+       m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), query.Priority, true);
 }
 
 void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType *typeOverride)
@@ -781,7 +781,7 @@ void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType
 
 void IdoPgsqlConnection::CleanUpExecuteQuery(const String& table, const String& time_column, double max_age)
 {
-       m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age), true);
+       m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age), PriorityLow, true);
 }
 
 void IdoPgsqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_column, double max_age)
index d4269d8c9397a65e129351b264f5a6c02e141e41..5bccffba4c65dc2eae11c7851f94b54ad15fd666 100644 (file)
@@ -540,7 +540,7 @@ void ApiListener::ApiTimerHandler(void)
 void ApiListener::RelayMessage(const MessageOrigin::Ptr& origin,
     const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log)
 {
-       m_RelayQueue.Enqueue(boost::bind(&ApiListener::SyncRelayMessage, this, origin, secobj, message, log), true);
+       m_RelayQueue.Enqueue(boost::bind(&ApiListener::SyncRelayMessage, this, origin, secobj, message, log), PriorityNormal, true);
 }
 
 void ApiListener::PersistMessage(const Dictionary::Ptr& message, const ConfigObject::Ptr& secobj)