From ce06aa3b56e38b127c9e274106e53f0381176389 Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Wed, 30 Oct 2013 13:32:01 +0100 Subject: [PATCH] Implement work queue support for db_ido_mysql. Refs #4758 --- .../db_ido_mysql/idomysqlconnection.cpp | 44 +++++++++++++++++++ components/db_ido_mysql/idomysqlconnection.h | 9 ++++ 2 files changed, 53 insertions(+) diff --git a/components/db_ido_mysql/idomysqlconnection.cpp b/components/db_ido_mysql/idomysqlconnection.cpp index 1a74beb75..b6a0a3749 100644 --- a/components/db_ido_mysql/idomysqlconnection.cpp +++ b/components/db_ido_mysql/idomysqlconnection.cpp @@ -52,12 +52,26 @@ void IdoMysqlConnection::Start(void) m_ReconnectTimer->SetInterval(10); m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&IdoMysqlConnection::ReconnectTimerHandler, this)); m_ReconnectTimer->Start(); + m_ReconnectTimer->Reschedule(0); ASSERT(mysql_thread_safe()); } void IdoMysqlConnection::Stop(void) { + m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::Disconnect, this)); + m_QueryQueue.Join(); +} + +void IdoMysqlConnection::AssertOnWorkQueue(void) +{ + VERIFY(boost::this_thread::get_id() == m_QueryQueue.GetThreadId()); +} + +void IdoMysqlConnection::Disconnect(void) +{ + AssertOnWorkQueue(); + boost::mutex::scoped_lock lock(m_ConnectionMutex); if (!m_Connected) @@ -70,6 +84,11 @@ void IdoMysqlConnection::Stop(void) } void IdoMysqlConnection::TxTimerHandler(void) +{ + m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::NewTransaction, this)); +} + +void IdoMysqlConnection::NewTransaction(void) { boost::mutex::scoped_lock lock(m_ConnectionMutex); @@ -82,6 +101,13 @@ void IdoMysqlConnection::TxTimerHandler(void) void IdoMysqlConnection::ReconnectTimerHandler(void) { + m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::Reconnect, this)); +} + +void IdoMysqlConnection::Reconnect(void) +{ + AssertOnWorkQueue(); + { boost::mutex::scoped_lock lock(m_ConnectionMutex); @@ -219,6 +245,8 @@ void IdoMysqlConnection::ClearConfigTable(const String& table) Array::Ptr IdoMysqlConnection::Query(const String& query) { + AssertOnWorkQueue(); + Log(LogDebug, "db_ido_mysql", "Query: " + query); if (mysql_query(&m_Connection, query.CStr()) != 0) @@ -251,11 +279,15 @@ Array::Ptr IdoMysqlConnection::Query(const String& query) DbReference IdoMysqlConnection::GetLastInsertID(void) { + AssertOnWorkQueue(); + return DbReference(mysql_insert_id(&m_Connection)); } String IdoMysqlConnection::Escape(const String& s) { + AssertOnWorkQueue(); + ssize_t length = s.GetLength(); char *to = new char[s.GetLength() * 2 + 1]; @@ -270,6 +302,8 @@ String IdoMysqlConnection::Escape(const String& s) Dictionary::Ptr IdoMysqlConnection::FetchRow(MYSQL_RES *result) { + AssertOnWorkQueue(); + MYSQL_ROW row; MYSQL_FIELD *field; unsigned long *lengths, i; @@ -402,6 +436,11 @@ bool IdoMysqlConnection::FieldToEscapedString(const String& key, const Value& va } void IdoMysqlConnection::ExecuteQuery(const DbQuery& query) +{ + m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query)); +} + +void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query) { boost::mutex::scoped_lock lock(m_ConnectionMutex); @@ -532,6 +571,11 @@ void IdoMysqlConnection::ExecuteQuery(const DbQuery& query) } void IdoMysqlConnection::CleanUpExecuteQuery(const String& table, const String& time_key, double time_value) +{ + m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalCleanUpExecuteQuery, this, table, time_key, time_value)); +} + +void IdoMysqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_key, double time_value) { boost::mutex::scoped_lock lock(m_ConnectionMutex); diff --git a/components/db_ido_mysql/idomysqlconnection.h b/components/db_ido_mysql/idomysqlconnection.h index df0882052..14bc96669 100644 --- a/components/db_ido_mysql/idomysqlconnection.h +++ b/components/db_ido_mysql/idomysqlconnection.h @@ -71,9 +71,18 @@ private: bool FieldToEscapedString(const String& key, const Value& value, Value *result); void InternalActivateObject(const DbObject::Ptr& dbobj); + void Disconnect(void); + void NewTransaction(void); + void Reconnect(void); + + void AssertOnWorkQueue(void); + void TxTimerHandler(void); void ReconnectTimerHandler(void); + void InternalExecuteQuery(const DbQuery& query); + void InternalCleanUpExecuteQuery(const String& table, const String& time_key, double time_value); + void ClearConfigTables(void); void ClearConfigTable(const String& table); }; -- 2.40.0