]> granicus.if.org Git - icinga2/commitdiff
Implement work queue support for db_ido_mysql.
authorGunnar Beutner <gunnar.beutner@netways.de>
Wed, 30 Oct 2013 12:32:01 +0000 (13:32 +0100)
committerGunnar Beutner <gunnar.beutner@netways.de>
Wed, 30 Oct 2013 12:32:01 +0000 (13:32 +0100)
Refs #4758

components/db_ido_mysql/idomysqlconnection.cpp
components/db_ido_mysql/idomysqlconnection.h

index 1a74beb755a666a6c1318e0b6e024c7e7b0a4b10..b6a0a37496ebad81446d4a3d3bd65f7108ae602d 100644 (file)
@@ -52,12 +52,26 @@ void IdoMysqlConnection::Start(void)
        m_ReconnectTimer->SetInterval(10);
        m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&IdoMysqlConnection::ReconnectTimerHandler, this));
        m_ReconnectTimer->Start();
+       m_ReconnectTimer->Reschedule(0);
 
        ASSERT(mysql_thread_safe());
 }
 
 void IdoMysqlConnection::Stop(void)
 {
+       m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::Disconnect, this));
+       m_QueryQueue.Join();
+}
+
+void IdoMysqlConnection::AssertOnWorkQueue(void)
+{
+       VERIFY(boost::this_thread::get_id() == m_QueryQueue.GetThreadId());
+}
+
+void IdoMysqlConnection::Disconnect(void)
+{
+       AssertOnWorkQueue();
+
        boost::mutex::scoped_lock lock(m_ConnectionMutex);
 
        if (!m_Connected)
@@ -70,6 +84,11 @@ void IdoMysqlConnection::Stop(void)
 }
 
 void IdoMysqlConnection::TxTimerHandler(void)
+{
+       m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::NewTransaction, this));
+}
+
+void IdoMysqlConnection::NewTransaction(void)
 {
        boost::mutex::scoped_lock lock(m_ConnectionMutex);
 
@@ -82,6 +101,13 @@ void IdoMysqlConnection::TxTimerHandler(void)
 
 void IdoMysqlConnection::ReconnectTimerHandler(void)
 {
+       m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::Reconnect, this));
+}
+
+void IdoMysqlConnection::Reconnect(void)
+{
+       AssertOnWorkQueue();
+
        {
                boost::mutex::scoped_lock lock(m_ConnectionMutex);
 
@@ -219,6 +245,8 @@ void IdoMysqlConnection::ClearConfigTable(const String& table)
 
 Array::Ptr IdoMysqlConnection::Query(const String& query)
 {
+       AssertOnWorkQueue();
+
        Log(LogDebug, "db_ido_mysql", "Query: " + query);
 
        if (mysql_query(&m_Connection, query.CStr()) != 0)
@@ -251,11 +279,15 @@ Array::Ptr IdoMysqlConnection::Query(const String& query)
 
 DbReference IdoMysqlConnection::GetLastInsertID(void)
 {
+       AssertOnWorkQueue();
+
        return DbReference(mysql_insert_id(&m_Connection));
 }
 
 String IdoMysqlConnection::Escape(const String& s)
 {
+       AssertOnWorkQueue();
+
        ssize_t length = s.GetLength();
        char *to = new char[s.GetLength() * 2 + 1];
 
@@ -270,6 +302,8 @@ String IdoMysqlConnection::Escape(const String& s)
 
 Dictionary::Ptr IdoMysqlConnection::FetchRow(MYSQL_RES *result)
 {
+       AssertOnWorkQueue();
+
        MYSQL_ROW row;
        MYSQL_FIELD *field;
        unsigned long *lengths, i;
@@ -402,6 +436,11 @@ bool IdoMysqlConnection::FieldToEscapedString(const String& key, const Value& va
 }
 
 void IdoMysqlConnection::ExecuteQuery(const DbQuery& query)
+{
+       m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query));
+}
+
+void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query)
 {
        boost::mutex::scoped_lock lock(m_ConnectionMutex);
 
@@ -532,6 +571,11 @@ void IdoMysqlConnection::ExecuteQuery(const DbQuery& query)
 }
 
 void IdoMysqlConnection::CleanUpExecuteQuery(const String& table, const String& time_key, double time_value)
+{
+       m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalCleanUpExecuteQuery, this, table, time_key, time_value));
+}
+
+void IdoMysqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_key, double time_value)
 {
        boost::mutex::scoped_lock lock(m_ConnectionMutex);
 
index df08820522376c956f40060e76698c2c89a47f63..14bc966696e8914244ebc23f01c8a8a52d0c3eff 100644 (file)
@@ -71,9 +71,18 @@ private:
        bool FieldToEscapedString(const String& key, const Value& value, Value *result);
        void InternalActivateObject(const DbObject::Ptr& dbobj);
 
+       void Disconnect(void);
+       void NewTransaction(void);
+       void Reconnect(void);
+
+       void AssertOnWorkQueue(void);
+
        void TxTimerHandler(void);
        void ReconnectTimerHandler(void);
 
+       void InternalExecuteQuery(const DbQuery& query);
+        void InternalCleanUpExecuteQuery(const String& table, const String& time_key, double time_value);
+
        void ClearConfigTables(void);
        void ClearConfigTable(const String& table);
 };