From: Gunnar Beutner Date: Fri, 13 Mar 2015 19:23:14 +0000 (+0100) Subject: Implement support for CLIENT_MULTI_STATEMENTS X-Git-Tag: v2.4.0~830 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=c7c49d7219673d7ec75aedd13bd5c0cd1b330d1c;p=icinga2 Implement support for CLIENT_MULTI_STATEMENTS fixes #8738 --- diff --git a/lib/db_ido_mysql/idomysqlconnection.cpp b/lib/db_ido_mysql/idomysqlconnection.cpp index b05a3bde2..a75bef97d 100644 --- a/lib/db_ido_mysql/idomysqlconnection.cpp +++ b/lib/db_ido_mysql/idomysqlconnection.cpp @@ -100,8 +100,6 @@ void IdoMysqlConnection::ExceptionHandler(boost::exception_ptr exp) Log(LogDebug, "IdoMysqlConnection") << "Exception during database operation: " << DiagnosticInformation(exp); - boost::mutex::scoped_lock lock(m_ConnectionMutex); - if (GetConnected()) { mysql_close(&m_Connection); @@ -118,8 +116,6 @@ void IdoMysqlConnection::Disconnect(void) { AssertOnWorkQueue(); - boost::mutex::scoped_lock lock(m_ConnectionMutex); - if (!GetConnected()) return; @@ -136,12 +132,13 @@ void IdoMysqlConnection::TxTimerHandler(void) void IdoMysqlConnection::NewTransaction(void) { + m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::FinishAsyncQueries, this, true)); m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalNewTransaction, this)); } void IdoMysqlConnection::InternalNewTransaction(void) { - boost::mutex::scoped_lock lock(m_ConnectionMutex); + AssertOnWorkQueue(); if (!GetConnected()) return; @@ -165,193 +162,200 @@ void IdoMysqlConnection::Reconnect(void) std::vector active_dbobjs; - { - boost::mutex::scoped_lock lock(m_ConnectionMutex); + bool reconnect = false; - bool reconnect = false; + if (GetConnected()) { + /* Check if we're really still connected */ + if (mysql_ping(&m_Connection) == 0) + return; - if (GetConnected()) { - /* Check if we're really still connected */ - if (mysql_ping(&m_Connection) == 0) - return; + mysql_close(&m_Connection); + SetConnected(false); + reconnect = true; + } - mysql_close(&m_Connection); - SetConnected(false); - reconnect = true; - } + ClearIDCache(); - ClearIDCache(); + String ihost, isocket_path, iuser, ipasswd, idb; + const char *host, *socket_path, *user , *passwd, *db; + long port; - String ihost, isocket_path, iuser, ipasswd, idb; - const char *host, *socket_path, *user , *passwd, *db; - long port; + ihost = GetHost(); + isocket_path = GetSocketPath(); + iuser = GetUser(); + ipasswd = GetPassword(); + idb = GetDatabase(); - ihost = GetHost(); - isocket_path = GetSocketPath(); - iuser = GetUser(); - ipasswd = GetPassword(); - idb = GetDatabase(); + host = (!ihost.IsEmpty()) ? ihost.CStr() : NULL; + port = GetPort(); + socket_path = (!isocket_path.IsEmpty()) ? isocket_path.CStr() : NULL; + user = (!iuser.IsEmpty()) ? iuser.CStr() : NULL; + passwd = (!ipasswd.IsEmpty()) ? ipasswd.CStr() : NULL; + db = (!idb.IsEmpty()) ? idb.CStr() : NULL; - host = (!ihost.IsEmpty()) ? ihost.CStr() : NULL; - port = GetPort(); - socket_path = (!isocket_path.IsEmpty()) ? isocket_path.CStr() : NULL; - user = (!iuser.IsEmpty()) ? iuser.CStr() : NULL; - passwd = (!ipasswd.IsEmpty()) ? ipasswd.CStr() : NULL; - db = (!idb.IsEmpty()) ? idb.CStr() : NULL; + /* connection */ + if (!mysql_init(&m_Connection)) { + Log(LogCritical, "IdoMysqlConnection") + << "mysql_init() failed: \"" << mysql_error(&m_Connection) << "\""; - /* connection */ - if (!mysql_init(&m_Connection)) { - Log(LogCritical, "IdoMysqlConnection") - << "mysql_init() failed: \"" << mysql_error(&m_Connection) << "\""; + BOOST_THROW_EXCEPTION(std::bad_alloc()); + } - BOOST_THROW_EXCEPTION(std::bad_alloc()); - } + if (!mysql_real_connect(&m_Connection, host, user, passwd, db, port, socket_path, CLIENT_FOUND_ROWS | CLIENT_MULTI_STATEMENTS)) { + Log(LogCritical, "IdoMysqlConnection") + << "Connection to database '" << db << "' with user '" << user << "' on '" << host << ":" << port + << "' failed: \"" << mysql_error(&m_Connection) << "\""; - if (!mysql_real_connect(&m_Connection, host, user, passwd, db, port, socket_path, CLIENT_FOUND_ROWS)) { - Log(LogCritical, "IdoMysqlConnection") - << "Connection to database '" << db << "' with user '" << user << "' on '" << host << ":" << port - << "' failed: \"" << mysql_error(&m_Connection) << "\""; + BOOST_THROW_EXCEPTION(std::runtime_error(mysql_error(&m_Connection))); + } - BOOST_THROW_EXCEPTION(std::runtime_error(mysql_error(&m_Connection))); - } + SetConnected(true); - SetConnected(true); + IdoMysqlResult result = Query("SELECT @@global.max_allowed_packet AS max_allowed_packet"); - String dbVersionName = "idoutils"; - IdoMysqlResult result = Query("SELECT version FROM " + GetTablePrefix() + "dbversion WHERE name='" + Escape(dbVersionName) + "'"); + Dictionary::Ptr row = FetchRow(result); - Dictionary::Ptr row = FetchRow(result); + if (row) + m_MaxPacketSize = row->Get("max_allowed_packet"); + else + m_MaxPacketSize = 64 * 1024; - if (!row) { - mysql_close(&m_Connection); - SetConnected(false); + DiscardRows(result); - Log(LogCritical, "IdoMysqlConnection", "Schema does not provide any valid version! Verify your schema installation."); + String dbVersionName = "idoutils"; + result = Query("SELECT version FROM " + GetTablePrefix() + "dbversion WHERE name='" + Escape(dbVersionName) + "'"); - Application::Exit(EXIT_FAILURE); - } + row = FetchRow(result); - DiscardRows(result); + if (!row) { + mysql_close(&m_Connection); + SetConnected(false); - String version = row->Get("version"); + Log(LogCritical, "IdoMysqlConnection", "Schema does not provide any valid version! Verify your schema installation."); - SetSchemaVersion(version); + Application::Exit(EXIT_FAILURE); + } - if (Utility::CompareVersion(IDO_COMPAT_SCHEMA_VERSION, version) < 0) { - mysql_close(&m_Connection); - SetConnected(false); + DiscardRows(result); - Log(LogCritical, "IdoMysqlConnection") - << "Schema version '" << version << "' does not match the required version '" - << IDO_COMPAT_SCHEMA_VERSION << "' (or newer)! Please check the upgrade documentation."; + String version = row->Get("version"); - Application::Exit(EXIT_FAILURE); - } + SetSchemaVersion(version); - String instanceName = GetInstanceName(); + if (Utility::CompareVersion(IDO_COMPAT_SCHEMA_VERSION, version) < 0) { + mysql_close(&m_Connection); + SetConnected(false); - result = Query("SELECT instance_id FROM " + GetTablePrefix() + "instances WHERE instance_name = '" + Escape(instanceName) + "'"); - row = FetchRow(result); + Log(LogCritical, "IdoMysqlConnection") + << "Schema version '" << version << "' does not match the required version '" + << IDO_COMPAT_SCHEMA_VERSION << "' (or newer)! Please check the upgrade documentation."; - if (!row) { - Query("INSERT INTO " + GetTablePrefix() + "instances (instance_name, instance_description) VALUES ('" + Escape(instanceName) + "', '" + Escape(GetInstanceDescription()) + "')"); - m_InstanceID = GetLastInsertID(); - } else { - m_InstanceID = DbReference(row->Get("instance_id")); - } + Application::Exit(EXIT_FAILURE); + } - DiscardRows(result); + String instanceName = GetInstanceName(); - Endpoint::Ptr my_endpoint = Endpoint::GetLocalEndpoint(); + result = Query("SELECT instance_id FROM " + GetTablePrefix() + "instances WHERE instance_name = '" + Escape(instanceName) + "'"); + row = FetchRow(result); - /* we have an endpoint in a cluster setup, so decide if we can proceed here */ - if (my_endpoint && GetHAMode() == HARunOnce) { - /* get the current endpoint writing to programstatus table */ - result = Query("SELECT UNIX_TIMESTAMP(status_update_time) AS status_update_time, endpoint_name FROM " + - GetTablePrefix() + "programstatus WHERE instance_id = " + Convert::ToString(m_InstanceID)); - row = FetchRow(result); - DiscardRows(result); + if (!row) { + Query("INSERT INTO " + GetTablePrefix() + "instances (instance_name, instance_description) VALUES ('" + Escape(instanceName) + "', '" + Escape(GetInstanceDescription()) + "')"); + m_InstanceID = GetLastInsertID(); + } else { + m_InstanceID = DbReference(row->Get("instance_id")); + } - String endpoint_name; + DiscardRows(result); - if (row) - endpoint_name = row->Get("endpoint_name"); - else - Log(LogNotice, "IdoMysqlConnection", "Empty program status table"); + Endpoint::Ptr my_endpoint = Endpoint::GetLocalEndpoint(); - /* if we did not write into the database earlier, another instance is active */ - if (endpoint_name != my_endpoint->GetName()) { - double status_update_time; + /* we have an endpoint in a cluster setup, so decide if we can proceed here */ + if (my_endpoint && GetHAMode() == HARunOnce) { + /* get the current endpoint writing to programstatus table */ + result = Query("SELECT UNIX_TIMESTAMP(status_update_time) AS status_update_time, endpoint_name FROM " + + GetTablePrefix() + "programstatus WHERE instance_id = " + Convert::ToString(m_InstanceID)); + row = FetchRow(result); + DiscardRows(result); - if (row) - status_update_time = row->Get("status_update_time"); - else - status_update_time = 0; + String endpoint_name; - double status_update_age = Utility::GetTime() - status_update_time; + if (row) + endpoint_name = row->Get("endpoint_name"); + else + Log(LogNotice, "IdoMysqlConnection", "Empty program status table"); - Log(LogNotice, "IdoMysqlConnection") - << "Last update by '" << endpoint_name << "' was " << status_update_age << "s ago."; + /* if we did not write into the database earlier, another instance is active */ + if (endpoint_name != my_endpoint->GetName()) { + double status_update_time; - if (status_update_age < GetFailoverTimeout()) { - mysql_close(&m_Connection); - SetConnected(false); - SetShouldConnect(false); + if (row) + status_update_time = row->Get("status_update_time"); + else + status_update_time = 0; - return; - } + double status_update_age = Utility::GetTime() - status_update_time; - /* activate the IDO only, if we're authoritative in this zone */ - if (IsPaused()) { - Log(LogNotice, "IdoMysqlConnection") - << "Local endpoint '" << my_endpoint->GetName() << "' is not authoritative, bailing out."; + Log(LogNotice, "IdoMysqlConnection") + << "Last update by '" << endpoint_name << "' was " << status_update_age << "s ago."; - mysql_close(&m_Connection); - SetConnected(false); + if (status_update_age < GetFailoverTimeout()) { + mysql_close(&m_Connection); + SetConnected(false); + SetShouldConnect(false); - return; - } + return; } - Log(LogNotice, "IdoMysqlConnection", "Enabling IDO connection."); + /* activate the IDO only, if we're authoritative in this zone */ + if (IsPaused()) { + Log(LogNotice, "IdoMysqlConnection") + << "Local endpoint '" << my_endpoint->GetName() << "' is not authoritative, bailing out."; + + mysql_close(&m_Connection); + SetConnected(false); + + return; + } } - Log(LogInformation, "IdoMysqlConnection") - << "MySQL IDO instance id: " << static_cast(m_InstanceID) << " (schema version: '" + version + "')"; + Log(LogNotice, "IdoMysqlConnection", "Enabling IDO connection."); + } - /* set session time zone to utc */ - Query("SET SESSION TIME_ZONE='+00:00'"); + Log(LogInformation, "IdoMysqlConnection") + << "MySQL IDO instance id: " << static_cast(m_InstanceID) << " (schema version: '" + version + "')"; - /* record connection */ - Query("INSERT INTO " + GetTablePrefix() + "conninfo " + - "(instance_id, connect_time, last_checkin_time, agent_name, agent_version, connect_type, data_start_time) VALUES (" - + Convert::ToString(static_cast(m_InstanceID)) + ", NOW(), NOW(), 'icinga2 db_ido_mysql', '" + Escape(Application::GetVersion()) - + "', '" + (reconnect ? "RECONNECT" : "INITIAL") + "', NOW())"); + /* set session time zone to utc */ + Query("SET SESSION TIME_ZONE='+00:00'"); - /* clear config tables for the initial config dump */ - PrepareDatabase(); + /* record connection */ + Query("INSERT INTO " + GetTablePrefix() + "conninfo " + + "(instance_id, connect_time, last_checkin_time, agent_name, agent_version, connect_type, data_start_time) VALUES (" + + Convert::ToString(static_cast(m_InstanceID)) + ", NOW(), NOW(), 'icinga2 db_ido_mysql', '" + Escape(Application::GetVersion()) + + "', '" + (reconnect ? "RECONNECT" : "INITIAL") + "', NOW())"); - std::ostringstream q1buf; - q1buf << "SELECT object_id, objecttype_id, name1, name2, is_active FROM " + GetTablePrefix() + "objects WHERE instance_id = " << static_cast(m_InstanceID); - result = Query(q1buf.str()); + /* clear config tables for the initial config dump */ + PrepareDatabase(); - while ((row = FetchRow(result))) { - DbType::Ptr dbtype = DbType::GetByID(row->Get("objecttype_id")); + std::ostringstream q1buf; + q1buf << "SELECT object_id, objecttype_id, name1, name2, is_active FROM " + GetTablePrefix() + "objects WHERE instance_id = " << static_cast(m_InstanceID); + result = Query(q1buf.str()); - if (!dbtype) - continue; + while ((row = FetchRow(result))) { + DbType::Ptr dbtype = DbType::GetByID(row->Get("objecttype_id")); - DbObject::Ptr dbobj = dbtype->GetOrCreateObjectByName(row->Get("name1"), row->Get("name2")); - SetObjectID(dbobj, DbReference(row->Get("object_id"))); - SetObjectActive(dbobj, row->Get("is_active")); + if (!dbtype) + continue; - if (GetObjectActive(dbobj)) - active_dbobjs.push_back(dbobj); - } + DbObject::Ptr dbobj = dbtype->GetOrCreateObjectByName(row->Get("name1"), row->Get("name2")); + SetObjectID(dbobj, DbReference(row->Get("object_id"))); + SetObjectActive(dbobj, row->Get("is_active")); - Query("BEGIN"); + if (GetObjectActive(dbobj)) + active_dbobjs.push_back(dbobj); } + Query("BEGIN"); + UpdateAllObjects(); /* deactivate all deleted configuration objects */ @@ -370,10 +374,119 @@ void IdoMysqlConnection::ClearConfigTable(const String& table) Query("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " + Convert::ToString(static_cast(m_InstanceID))); } +void IdoMysqlConnection::AsyncQuery(const String& query, const boost::function& callback) +{ + AssertOnWorkQueue(); + + IdoAsyncQuery aq; + aq.Query = query; + aq.Callback = callback; + m_AsyncQueries.push_back(aq); + m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::FinishAsyncQueries, this, false)); +} + +void IdoMysqlConnection::FinishAsyncQueries(bool force) +{ + if (m_AsyncQueries.size() < 10 && !force) + return; + + std::vector queries; + m_AsyncQueries.swap(queries); + + std::vector::size_type offset = 0; + + while (offset < queries.size()) { + std::ostringstream querybuf; + + std::vector::size_type count = 0; + size_t num_bytes = 0; + + for (std::vector::size_type i = offset; i < queries.size(); i++) { + const IdoAsyncQuery& aq = queries[i]; + + size_t size_query = aq.Query.GetLength() + 1; + + if (num_bytes + size_query > m_MaxPacketSize - 512) + break; + + if (count > 0) + querybuf << ";"; + + IncreaseQueryCount(); + count++; + + querybuf << aq.Query; + num_bytes += size_query; + } + + String query = querybuf.str(); + + if (mysql_query(&m_Connection, query.CStr()) != 0) { + std::ostringstream msgbuf; + String message = mysql_error(&m_Connection); + msgbuf << "Error \"" << message << "\" when executing query \"" << query << "\""; + Log(LogCritical, "IdoMysqlConnection", msgbuf.str()); + + BOOST_THROW_EXCEPTION( + database_error() + << errinfo_message(mysql_error(&m_Connection)) + << errinfo_database_query(query) + ); + } + + for (std::vector::size_type i = offset; i < offset + count; i++) { + const IdoAsyncQuery& aq = queries[i]; + + m_AffectedRows = mysql_affected_rows(&m_Connection); + + MYSQL_RES *result = mysql_use_result(&m_Connection); + + IdoMysqlResult iresult; + + if (!result) { + if (mysql_field_count(&m_Connection) > 0) { + std::ostringstream msgbuf; + String message = mysql_error(&m_Connection); + msgbuf << "Error \"" << message << "\" when executing query \"" << aq.Query << "\""; + Log(LogCritical, "IdoMysqlConnection", msgbuf.str()); + + BOOST_THROW_EXCEPTION( + database_error() + << errinfo_message(mysql_error(&m_Connection)) + << errinfo_database_query(query) + ); + } + } else + iresult = IdoMysqlResult(result, std::ptr_fun(mysql_free_result)); + + if (aq.Callback) + aq.Callback(iresult); + + if (mysql_next_result(&m_Connection) > 0) { + std::ostringstream msgbuf; + String message = mysql_error(&m_Connection); + msgbuf << "Error \"" << message << "\" when executing query \"" << query << "\""; + Log(LogCritical, "IdoMysqlConnection", msgbuf.str()); + + BOOST_THROW_EXCEPTION( + database_error() + << errinfo_message(mysql_error(&m_Connection)) + << errinfo_database_query(query) + ); + } + } + + offset += count; + } +} + IdoMysqlResult IdoMysqlConnection::Query(const String& query) { AssertOnWorkQueue(); + /* finish all async queries to maintain the right order for queries */ + FinishAsyncQueries(true); + Log(LogDebug, "IdoMysqlConnection") << "Query: " << query; @@ -483,12 +596,13 @@ void IdoMysqlConnection::DiscardRows(const IdoMysqlResult& result) void IdoMysqlConnection::ActivateObject(const DbObject::Ptr& dbobj) { - boost::mutex::scoped_lock lock(m_ConnectionMutex); - InternalActivateObject(dbobj); + m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalActivateObject, this, dbobj)); } void IdoMysqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj) { + AssertOnWorkQueue(); + if (!GetConnected()) return; @@ -510,13 +624,18 @@ void IdoMysqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj) SetObjectID(dbobj, GetLastInsertID()); } else { qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 1 WHERE object_id = " << static_cast(dbref); - Query(qbuf.str()); + AsyncQuery(qbuf.str()); } } void IdoMysqlConnection::DeactivateObject(const DbObject::Ptr& dbobj) { - boost::mutex::scoped_lock lock(m_ConnectionMutex); + m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalDeactivateObject, this, dbobj)); +} + +void IdoMysqlConnection::InternalDeactivateObject(const DbObject::Ptr& dbobj) +{ + AssertOnWorkQueue(); if (!GetConnected()) return; @@ -528,13 +647,12 @@ void IdoMysqlConnection::DeactivateObject(const DbObject::Ptr& dbobj) std::ostringstream qbuf; qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 0 WHERE object_id = " << static_cast(dbref); - Query(qbuf.str()); + AsyncQuery(qbuf.str()); /* Note that we're _NOT_ clearing the db refs via SetReference/SetConfigUpdate/SetStatusUpdate * because the object is still in the database. */ } -/* caller must hold m_ConnectionMutex */ bool IdoMysqlConnection::FieldToEscapedString(const String& key, const Value& value, Value *result) { if (key == "instance_id") { @@ -606,7 +724,7 @@ void IdoMysqlConnection::ExecuteQuery(const DbQuery& query) void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType *typeOverride) { - boost::mutex::scoped_lock lock(m_ConnectionMutex); + AssertOnWorkQueue(); if ((query.Category & GetCategories()) == 0) return; @@ -716,11 +834,12 @@ void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType if (type != DbQueryInsert) qbuf << where.str(); - Query(qbuf.str()); + AsyncQuery(qbuf.str(), boost::bind(&IdoMysqlConnection::FinishExecuteQuery, this, query, type, upsert)); +} +void IdoMysqlConnection::FinishExecuteQuery(const DbQuery& query, int type, bool upsert) +{ if (upsert && GetAffectedRows() == 0) { - lock.unlock(); - DbQueryType to = DbQueryInsert; InternalExecuteQuery(query, &to); @@ -749,12 +868,12 @@ void IdoMysqlConnection::CleanUpExecuteQuery(const String& table, const String& void IdoMysqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_column, double max_age) { - boost::mutex::scoped_lock lock(m_ConnectionMutex); + AssertOnWorkQueue(); if (!GetConnected()) return; - Query("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " + + AsyncQuery("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " + Convert::ToString(static_cast(m_InstanceID)) + " AND " + time_column + " < FROM_UNIXTIME(" + Convert::ToString(static_cast(max_age)) + ")"); } diff --git a/lib/db_ido_mysql/idomysqlconnection.hpp b/lib/db_ido_mysql/idomysqlconnection.hpp index 623fe465b..41cae1fd4 100644 --- a/lib/db_ido_mysql/idomysqlconnection.hpp +++ b/lib/db_ido_mysql/idomysqlconnection.hpp @@ -31,6 +31,14 @@ namespace icinga typedef boost::shared_ptr IdoMysqlResult; +typedef boost::function IdoAsyncCallback; + +struct IdoAsyncQuery +{ + String Query; + IdoAsyncCallback Callback; +}; + /** * An IDO MySQL database connection. * @@ -64,9 +72,11 @@ private: WorkQueue m_QueryQueue; - boost::mutex m_ConnectionMutex; MYSQL m_Connection; int m_AffectedRows; + int m_MaxPacketSize; + + std::vector m_AsyncQueries; Timer::Ptr m_ReconnectTimer; Timer::Ptr m_TxTimer; @@ -78,8 +88,12 @@ private: Dictionary::Ptr FetchRow(const IdoMysqlResult& result); void DiscardRows(const IdoMysqlResult& result); + void AsyncQuery(const String& query, const IdoAsyncCallback& callback = IdoAsyncCallback()); + void FinishAsyncQueries(bool force = false); + bool FieldToEscapedString(const String& key, const Value& value, Value *result); void InternalActivateObject(const DbObject::Ptr& dbobj); + void InternalDeactivateObject(const DbObject::Ptr& dbobj); void Disconnect(void); void Reconnect(void); @@ -90,6 +104,7 @@ private: void ReconnectTimerHandler(void); void InternalExecuteQuery(const DbQuery& query, DbQueryType *typeOverride = NULL); + void FinishExecuteQuery(const DbQuery& query, int type, bool upsert); void InternalCleanUpExecuteQuery(const String& table, const String& time_key, double time_value); void InternalNewTransaction(void);