]> granicus.if.org Git - icinga2/commitdiff
Make sure IDO reconnect attempts don't recurse.
authorGunnar Beutner <gunnar.beutner@netways.de>
Thu, 28 Nov 2013 09:36:43 +0000 (10:36 +0100)
committerGunnar Beutner <gunnar.beutner@netways.de>
Thu, 28 Nov 2013 09:36:43 +0000 (10:36 +0100)
Refs #5235

components/db_ido_mysql/idomysqlconnection.cpp
components/db_ido_pgsql/idopgsqlconnection.cpp
lib/base/workqueue.cpp
lib/base/workqueue.h

index c99b3f49c86fb917ddc40a1dcf0ad5f8e3bf0386..d824d033e47606c458de040074ec8e387fedeecf 100644 (file)
@@ -99,7 +99,7 @@ void IdoMysqlConnection::Disconnect(void)
 
 void IdoMysqlConnection::TxTimerHandler(void)
 {
-       m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::NewTransaction, this));
+       m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::NewTransaction, this), true);
 }
 
 void IdoMysqlConnection::NewTransaction(void)
@@ -465,7 +465,7 @@ void IdoMysqlConnection::ExecuteQuery(const DbQuery& query)
 {
        ASSERT(query.Category != DbCatInvalid);
 
-       m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query));
+       m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query), true);
 }
 
 void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query)
@@ -595,7 +595,7 @@ void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query)
 
 void IdoMysqlConnection::CleanUpExecuteQuery(const String& table, const String& time_column, double max_age)
 {
-       m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age));
+       m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age), true);
 }
 
 void IdoMysqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_column, double max_age)
index 8bfaff2e1b5e090d43ef6ae8407acbd9e71d2f06..c512af8476fdc9ffd4eca886c0040c215737609d 100644 (file)
@@ -99,7 +99,7 @@ void IdoPgsqlConnection::Disconnect(void)
 
 void IdoPgsqlConnection::TxTimerHandler(void)
 {
-       m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::NewTransaction, this));
+       m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::NewTransaction, this), true);
 }
 
 void IdoPgsqlConnection::NewTransaction(void)
@@ -481,7 +481,7 @@ void IdoPgsqlConnection::ExecuteQuery(const DbQuery& query)
 {
        ASSERT(query.Category != DbCatInvalid);
 
-       m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query));
+       m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query), true);
 }
 
 void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query)
@@ -616,7 +616,7 @@ void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query)
 
 void IdoPgsqlConnection::CleanUpExecuteQuery(const String& table, const String& time_column, double max_age)
 {
-       m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age));
+       m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age), true);
 }
 
 void IdoPgsqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_column, double max_age)
index 218c91b3639487acfe6fe15e29f5d14879ca9df2..638264f5f92d74ac77186ac865e11f4875fc2204 100644 (file)
@@ -45,7 +45,7 @@ WorkQueue::~WorkQueue(void)
  * Enqueues a work item. Work items are guaranteed to be executed in the order
  * they were enqueued in.
  */
-void WorkQueue::Enqueue(const WorkCallback& item)
+void WorkQueue::Enqueue(const WorkCallback& callback, bool allowInterleaved)
 {
        boost::mutex::scoped_lock lock(m_Mutex);
 
@@ -58,10 +58,14 @@ void WorkQueue::Enqueue(const WorkCallback& item)
                        m_CV.wait(lock);
        }
 
+       WorkItem item;
+       item.Callback = callback;
+       item.AllowInterleaved = allowInterleaved;
+
        m_Items.push_back(item);
 
        if (wq_thread)
-               ProcessItems(lock);
+               ProcessItems(lock, true);
        else
                m_CV.notify_all();
 }
@@ -93,16 +97,20 @@ void WorkQueue::DefaultExceptionCallback(boost::exception_ptr exp)
        throw;
 }
 
-void WorkQueue::ProcessItems(boost::mutex::scoped_lock& lock)
+void WorkQueue::ProcessItems(boost::mutex::scoped_lock& lock, bool interleaved)
 {
        while (!m_Items.empty()) {
                try {
-                       WorkCallback wi = m_Items.front();
+                       WorkItem wi = m_Items.front();
+
+                       if (interleaved && !wi.AllowInterleaved)
+                               return;
+
                        m_Items.pop_front();
                        m_CV.notify_all();
 
                        lock.unlock();
-                       wi();
+                       wi.Callback();
                } catch (const std::exception& ex) {
                        lock.lock();
 
@@ -132,7 +140,7 @@ void WorkQueue::WorkerThreadProc(void)
                if (m_Joined)
                        break;
 
-               ProcessItems(lock);
+               ProcessItems(lock, false);
        }
 
        m_Stopped = true;
index 71445b8cb586d465198bd7b3147d3296be6d6fbd..cccf625c42dca722d82429c4836b0a9787c8c6bc 100644 (file)
 namespace icinga
 {
 
+typedef boost::function<void (void)> WorkCallback;
+
+struct WorkItem
+{
+
+       WorkCallback Callback;
+       bool AllowInterleaved;
+};
+
 /**
  * A workqueue.
  *
@@ -39,13 +48,12 @@ namespace icinga
 class I2_BASE_API WorkQueue
 {
 public:
-       typedef boost::function<void (void)> WorkCallback;
        typedef boost::function<void (boost::exception_ptr)> ExceptionCallback;
 
        WorkQueue(size_t maxItems = 25000);
        ~WorkQueue(void);
 
-       void Enqueue(const WorkCallback& item);
+       void Enqueue(const WorkCallback& callback, bool allowInterleaved = false);
        void Join(void);
 
        boost::thread::id GetThreadId(void) const;
@@ -62,10 +70,10 @@ private:
        size_t m_MaxItems;
        bool m_Joined;
        bool m_Stopped;
-       std::deque<WorkCallback> m_Items;
+       std::deque<WorkItem> m_Items;
        ExceptionCallback m_ExceptionCallback;
 
-       void ProcessItems(boost::mutex::scoped_lock& lock);
+       void ProcessItems(boost::mutex::scoped_lock& lock, bool interleaved);
        void WorkerThreadProc(void);
 
        static void DefaultExceptionCallback(boost::exception_ptr exp);