]> granicus.if.org Git - icinga2/commitdiff
Implement support for CLIENT_MULTI_STATEMENTS
authorGunnar Beutner <gunnar@beutner.name>
Fri, 13 Mar 2015 19:23:14 +0000 (20:23 +0100)
committerGunnar Beutner <gunnar@beutner.name>
Fri, 13 Mar 2015 19:23:14 +0000 (20:23 +0100)
fixes #8738

lib/db_ido_mysql/idomysqlconnection.cpp
lib/db_ido_mysql/idomysqlconnection.hpp

index b05a3bde2cfeae1e18523ccc0179c3a0601a964b..a75bef97df5c7bee7cee14be426c9faffd32e4a0 100644 (file)
@@ -100,8 +100,6 @@ void IdoMysqlConnection::ExceptionHandler(boost::exception_ptr exp)
        Log(LogDebug, "IdoMysqlConnection")
            << "Exception during database operation: " << DiagnosticInformation(exp);
 
-       boost::mutex::scoped_lock lock(m_ConnectionMutex);
-
        if (GetConnected()) {
                mysql_close(&m_Connection);
 
@@ -118,8 +116,6 @@ void IdoMysqlConnection::Disconnect(void)
 {
        AssertOnWorkQueue();
 
-       boost::mutex::scoped_lock lock(m_ConnectionMutex);
-
        if (!GetConnected())
                return;
 
@@ -136,12 +132,13 @@ void IdoMysqlConnection::TxTimerHandler(void)
 
 void IdoMysqlConnection::NewTransaction(void)
 {
+       m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::FinishAsyncQueries, this, true));
        m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalNewTransaction, this));
 }
 
 void IdoMysqlConnection::InternalNewTransaction(void)
 {
-       boost::mutex::scoped_lock lock(m_ConnectionMutex);
+       AssertOnWorkQueue();
 
        if (!GetConnected())
                return;
@@ -165,193 +162,200 @@ void IdoMysqlConnection::Reconnect(void)
 
        std::vector<DbObject::Ptr> active_dbobjs;
 
-       {
-               boost::mutex::scoped_lock lock(m_ConnectionMutex);
+       bool reconnect = false;
 
-               bool reconnect = false;
+       if (GetConnected()) {
+               /* Check if we're really still connected */
+               if (mysql_ping(&m_Connection) == 0)
+                       return;
 
-               if (GetConnected()) {
-                       /* Check if we're really still connected */
-                       if (mysql_ping(&m_Connection) == 0)
-                               return;
+               mysql_close(&m_Connection);
+               SetConnected(false);
+               reconnect = true;
+       }
 
-                       mysql_close(&m_Connection);
-                       SetConnected(false);
-                       reconnect = true;
-               }
+       ClearIDCache();
 
-               ClearIDCache();
+       String ihost, isocket_path, iuser, ipasswd, idb;
+       const char *host, *socket_path, *user , *passwd, *db;
+       long port;
 
-               String ihost, isocket_path, iuser, ipasswd, idb;
-               const char *host, *socket_path, *user , *passwd, *db;
-               long port;
+       ihost = GetHost();
+       isocket_path = GetSocketPath();
+       iuser = GetUser();
+       ipasswd = GetPassword();
+       idb = GetDatabase();
 
-               ihost = GetHost();
-               isocket_path = GetSocketPath();
-               iuser = GetUser();
-               ipasswd = GetPassword();
-               idb = GetDatabase();
+       host = (!ihost.IsEmpty()) ? ihost.CStr() : NULL;
+       port = GetPort();
+       socket_path = (!isocket_path.IsEmpty()) ? isocket_path.CStr() : NULL;
+       user = (!iuser.IsEmpty()) ? iuser.CStr() : NULL;
+       passwd = (!ipasswd.IsEmpty()) ? ipasswd.CStr() : NULL;
+       db = (!idb.IsEmpty()) ? idb.CStr() : NULL;
 
-               host = (!ihost.IsEmpty()) ? ihost.CStr() : NULL;
-               port = GetPort();
-               socket_path = (!isocket_path.IsEmpty()) ? isocket_path.CStr() : NULL;
-               user = (!iuser.IsEmpty()) ? iuser.CStr() : NULL;
-               passwd = (!ipasswd.IsEmpty()) ? ipasswd.CStr() : NULL;
-               db = (!idb.IsEmpty()) ? idb.CStr() : NULL;
+       /* connection */
+       if (!mysql_init(&m_Connection)) {
+               Log(LogCritical, "IdoMysqlConnection")
+                   << "mysql_init() failed: \"" << mysql_error(&m_Connection) << "\"";
 
-               /* connection */
-               if (!mysql_init(&m_Connection)) {
-                       Log(LogCritical, "IdoMysqlConnection")
-                           << "mysql_init() failed: \"" << mysql_error(&m_Connection) << "\"";
+               BOOST_THROW_EXCEPTION(std::bad_alloc());
+       }
 
-                       BOOST_THROW_EXCEPTION(std::bad_alloc());
-               }
+       if (!mysql_real_connect(&m_Connection, host, user, passwd, db, port, socket_path, CLIENT_FOUND_ROWS | CLIENT_MULTI_STATEMENTS)) {
+               Log(LogCritical, "IdoMysqlConnection")
+                   << "Connection to database '" << db << "' with user '" << user << "' on '" << host << ":" << port
+                   << "' failed: \"" << mysql_error(&m_Connection) << "\"";
 
-               if (!mysql_real_connect(&m_Connection, host, user, passwd, db, port, socket_path, CLIENT_FOUND_ROWS)) {
-                       Log(LogCritical, "IdoMysqlConnection")
-                           << "Connection to database '" << db << "' with user '" << user << "' on '" << host << ":" << port
-                           << "' failed: \"" << mysql_error(&m_Connection) << "\"";
+               BOOST_THROW_EXCEPTION(std::runtime_error(mysql_error(&m_Connection)));
+       }
 
-                       BOOST_THROW_EXCEPTION(std::runtime_error(mysql_error(&m_Connection)));
-               }
+       SetConnected(true);
 
-               SetConnected(true);
+       IdoMysqlResult result = Query("SELECT @@global.max_allowed_packet AS max_allowed_packet");
 
-               String dbVersionName = "idoutils";
-               IdoMysqlResult result = Query("SELECT version FROM " + GetTablePrefix() + "dbversion WHERE name='" + Escape(dbVersionName) + "'");
+       Dictionary::Ptr row = FetchRow(result);
 
-               Dictionary::Ptr row = FetchRow(result);
+       if (row)
+               m_MaxPacketSize = row->Get("max_allowed_packet");
+       else
+               m_MaxPacketSize = 64 * 1024;
 
-               if (!row) {
-                       mysql_close(&m_Connection);
-                       SetConnected(false);
+       DiscardRows(result);
 
-                       Log(LogCritical, "IdoMysqlConnection", "Schema does not provide any valid version! Verify your schema installation.");
+       String dbVersionName = "idoutils";
+       result = Query("SELECT version FROM " + GetTablePrefix() + "dbversion WHERE name='" + Escape(dbVersionName) + "'");
 
-                       Application::Exit(EXIT_FAILURE);
-               }
+       row = FetchRow(result);
 
-               DiscardRows(result);
+       if (!row) {
+               mysql_close(&m_Connection);
+               SetConnected(false);
 
-               String version = row->Get("version");
+               Log(LogCritical, "IdoMysqlConnection", "Schema does not provide any valid version! Verify your schema installation.");
 
-               SetSchemaVersion(version);
+               Application::Exit(EXIT_FAILURE);
+       }
 
-               if (Utility::CompareVersion(IDO_COMPAT_SCHEMA_VERSION, version) < 0) {
-                       mysql_close(&m_Connection);
-                       SetConnected(false);
+       DiscardRows(result);
 
-                       Log(LogCritical, "IdoMysqlConnection")
-                           << "Schema version '" << version << "' does not match the required version '"
-                           << IDO_COMPAT_SCHEMA_VERSION << "' (or newer)! Please check the upgrade documentation.";
+       String version = row->Get("version");
 
-                       Application::Exit(EXIT_FAILURE);
-               }
+       SetSchemaVersion(version);
 
-               String instanceName = GetInstanceName();
+       if (Utility::CompareVersion(IDO_COMPAT_SCHEMA_VERSION, version) < 0) {
+               mysql_close(&m_Connection);
+               SetConnected(false);
 
-               result = Query("SELECT instance_id FROM " + GetTablePrefix() + "instances WHERE instance_name = '" + Escape(instanceName) + "'");
-               row = FetchRow(result);
+               Log(LogCritical, "IdoMysqlConnection")
+                   << "Schema version '" << version << "' does not match the required version '"
+                   << IDO_COMPAT_SCHEMA_VERSION << "' (or newer)! Please check the upgrade documentation.";
 
-               if (!row) {
-                       Query("INSERT INTO " + GetTablePrefix() + "instances (instance_name, instance_description) VALUES ('" + Escape(instanceName) + "', '" + Escape(GetInstanceDescription()) + "')");
-                       m_InstanceID = GetLastInsertID();
-               } else {
-                       m_InstanceID = DbReference(row->Get("instance_id"));
-               }
+               Application::Exit(EXIT_FAILURE);
+       }
 
-               DiscardRows(result);
+       String instanceName = GetInstanceName();
 
-               Endpoint::Ptr my_endpoint = Endpoint::GetLocalEndpoint();
+       result = Query("SELECT instance_id FROM " + GetTablePrefix() + "instances WHERE instance_name = '" + Escape(instanceName) + "'");
+       row = FetchRow(result);
 
-               /* we have an endpoint in a cluster setup, so decide if we can proceed here */
-               if (my_endpoint && GetHAMode() == HARunOnce) {
-                       /* get the current endpoint writing to programstatus table */
-                       result = Query("SELECT UNIX_TIMESTAMP(status_update_time) AS status_update_time, endpoint_name FROM " +
-                           GetTablePrefix() + "programstatus WHERE instance_id = " + Convert::ToString(m_InstanceID));
-                       row = FetchRow(result);
-                       DiscardRows(result);
+       if (!row) {
+               Query("INSERT INTO " + GetTablePrefix() + "instances (instance_name, instance_description) VALUES ('" + Escape(instanceName) + "', '" + Escape(GetInstanceDescription()) + "')");
+               m_InstanceID = GetLastInsertID();
+       } else {
+               m_InstanceID = DbReference(row->Get("instance_id"));
+       }
 
-                       String endpoint_name;
+       DiscardRows(result);
 
-                       if (row)
-                               endpoint_name = row->Get("endpoint_name");
-                       else
-                               Log(LogNotice, "IdoMysqlConnection", "Empty program status table");
+       Endpoint::Ptr my_endpoint = Endpoint::GetLocalEndpoint();
 
-                       /* if we did not write into the database earlier, another instance is active */
-                       if (endpoint_name != my_endpoint->GetName()) {
-                               double status_update_time;
+       /* we have an endpoint in a cluster setup, so decide if we can proceed here */
+       if (my_endpoint && GetHAMode() == HARunOnce) {
+               /* get the current endpoint writing to programstatus table */
+               result = Query("SELECT UNIX_TIMESTAMP(status_update_time) AS status_update_time, endpoint_name FROM " +
+                   GetTablePrefix() + "programstatus WHERE instance_id = " + Convert::ToString(m_InstanceID));
+               row = FetchRow(result);
+               DiscardRows(result);
 
-                               if (row)
-                                       status_update_time = row->Get("status_update_time");
-                               else
-                                       status_update_time = 0;
+               String endpoint_name;
 
-                               double status_update_age = Utility::GetTime() - status_update_time;
+               if (row)
+                       endpoint_name = row->Get("endpoint_name");
+               else
+                       Log(LogNotice, "IdoMysqlConnection", "Empty program status table");
 
-                               Log(LogNotice, "IdoMysqlConnection")
-                                   << "Last update by '" << endpoint_name << "' was " << status_update_age << "s ago.";
+               /* if we did not write into the database earlier, another instance is active */
+               if (endpoint_name != my_endpoint->GetName()) {
+                       double status_update_time;
 
-                               if (status_update_age < GetFailoverTimeout()) {
-                                       mysql_close(&m_Connection);
-                                       SetConnected(false);
-                                       SetShouldConnect(false);
+                       if (row)
+                               status_update_time = row->Get("status_update_time");
+                       else
+                               status_update_time = 0;
 
-                                       return;
-                               }
+                       double status_update_age = Utility::GetTime() - status_update_time;
 
-                               /* activate the IDO only, if we're authoritative in this zone */
-                               if (IsPaused()) {
-                                       Log(LogNotice, "IdoMysqlConnection")
-                                           << "Local endpoint '" << my_endpoint->GetName() << "' is not authoritative, bailing out.";
+                       Log(LogNotice, "IdoMysqlConnection")
+                           << "Last update by '" << endpoint_name << "' was " << status_update_age << "s ago.";
 
-                                       mysql_close(&m_Connection);
-                                       SetConnected(false);
+                       if (status_update_age < GetFailoverTimeout()) {
+                               mysql_close(&m_Connection);
+                               SetConnected(false);
+                               SetShouldConnect(false);
 
-                                       return;
-                               }
+                               return;
                        }
 
-                       Log(LogNotice, "IdoMysqlConnection", "Enabling IDO connection.");
+                       /* activate the IDO only, if we're authoritative in this zone */
+                       if (IsPaused()) {
+                               Log(LogNotice, "IdoMysqlConnection")
+                                   << "Local endpoint '" << my_endpoint->GetName() << "' is not authoritative, bailing out.";
+
+                               mysql_close(&m_Connection);
+                               SetConnected(false);
+
+                               return;
+                       }
                }
 
-               Log(LogInformation, "IdoMysqlConnection")
-                   << "MySQL IDO instance id: " << static_cast<long>(m_InstanceID) << " (schema version: '" + version + "')";
+               Log(LogNotice, "IdoMysqlConnection", "Enabling IDO connection.");
+       }
 
-               /* set session time zone to utc */
-               Query("SET SESSION TIME_ZONE='+00:00'");
+       Log(LogInformation, "IdoMysqlConnection")
+           << "MySQL IDO instance id: " << static_cast<long>(m_InstanceID) << " (schema version: '" + version + "')";
 
-               /* record connection */
-               Query("INSERT INTO " + GetTablePrefix() + "conninfo " +
-                   "(instance_id, connect_time, last_checkin_time, agent_name, agent_version, connect_type, data_start_time) VALUES ("
-                   + Convert::ToString(static_cast<long>(m_InstanceID)) + ", NOW(), NOW(), 'icinga2 db_ido_mysql', '" + Escape(Application::GetVersion())
-                   + "', '" + (reconnect ? "RECONNECT" : "INITIAL") + "', NOW())");
+       /* set session time zone to utc */
+       Query("SET SESSION TIME_ZONE='+00:00'");
 
-               /* clear config tables for the initial config dump */
-               PrepareDatabase();
+       /* record connection */
+       Query("INSERT INTO " + GetTablePrefix() + "conninfo " +
+           "(instance_id, connect_time, last_checkin_time, agent_name, agent_version, connect_type, data_start_time) VALUES ("
+           + Convert::ToString(static_cast<long>(m_InstanceID)) + ", NOW(), NOW(), 'icinga2 db_ido_mysql', '" + Escape(Application::GetVersion())
+           + "', '" + (reconnect ? "RECONNECT" : "INITIAL") + "', NOW())");
 
-               std::ostringstream q1buf;
-               q1buf << "SELECT object_id, objecttype_id, name1, name2, is_active FROM " + GetTablePrefix() + "objects WHERE instance_id = " << static_cast<long>(m_InstanceID);
-               result = Query(q1buf.str());
+       /* clear config tables for the initial config dump */
+       PrepareDatabase();
 
-               while ((row = FetchRow(result))) {
-                       DbType::Ptr dbtype = DbType::GetByID(row->Get("objecttype_id"));
+       std::ostringstream q1buf;
+       q1buf << "SELECT object_id, objecttype_id, name1, name2, is_active FROM " + GetTablePrefix() + "objects WHERE instance_id = " << static_cast<long>(m_InstanceID);
+       result = Query(q1buf.str());
 
-                       if (!dbtype)
-                               continue;
+       while ((row = FetchRow(result))) {
+               DbType::Ptr dbtype = DbType::GetByID(row->Get("objecttype_id"));
 
-                       DbObject::Ptr dbobj = dbtype->GetOrCreateObjectByName(row->Get("name1"), row->Get("name2"));
-                       SetObjectID(dbobj, DbReference(row->Get("object_id")));
-                       SetObjectActive(dbobj, row->Get("is_active"));
+               if (!dbtype)
+                       continue;
 
-                       if (GetObjectActive(dbobj))
-                               active_dbobjs.push_back(dbobj);
-               }
+               DbObject::Ptr dbobj = dbtype->GetOrCreateObjectByName(row->Get("name1"), row->Get("name2"));
+               SetObjectID(dbobj, DbReference(row->Get("object_id")));
+               SetObjectActive(dbobj, row->Get("is_active"));
 
-               Query("BEGIN");
+               if (GetObjectActive(dbobj))
+                       active_dbobjs.push_back(dbobj);
        }
 
+       Query("BEGIN");
+
        UpdateAllObjects();
 
        /* deactivate all deleted configuration objects */
@@ -370,10 +374,119 @@ void IdoMysqlConnection::ClearConfigTable(const String& table)
        Query("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " + Convert::ToString(static_cast<long>(m_InstanceID)));
 }
 
+void IdoMysqlConnection::AsyncQuery(const String& query, const boost::function<void (const IdoMysqlResult&)>& callback)
+{
+       AssertOnWorkQueue();
+
+       IdoAsyncQuery aq;
+       aq.Query = query;
+       aq.Callback = callback;
+       m_AsyncQueries.push_back(aq);
+       m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::FinishAsyncQueries, this, false));
+}
+
+void IdoMysqlConnection::FinishAsyncQueries(bool force)
+{
+       if (m_AsyncQueries.size() < 10 && !force)
+               return;
+
+       std::vector<IdoAsyncQuery> queries;
+       m_AsyncQueries.swap(queries);
+
+       std::vector<IdoAsyncQuery>::size_type offset = 0;
+
+       while (offset < queries.size()) {
+               std::ostringstream querybuf;
+
+               std::vector<IdoAsyncQuery>::size_type count = 0;
+               size_t num_bytes = 0;
+
+               for (std::vector<IdoAsyncQuery>::size_type i = offset; i < queries.size(); i++) {
+                       const IdoAsyncQuery& aq = queries[i];
+
+                       size_t size_query = aq.Query.GetLength() + 1;
+
+                       if (num_bytes + size_query > m_MaxPacketSize - 512)
+                               break;
+
+                       if (count > 0)
+                               querybuf << ";";
+
+                       IncreaseQueryCount();
+                       count++;
+
+                       querybuf << aq.Query;
+                       num_bytes += size_query;
+               }
+
+               String query = querybuf.str();
+
+               if (mysql_query(&m_Connection, query.CStr()) != 0) {
+                       std::ostringstream msgbuf;
+                       String message = mysql_error(&m_Connection);
+                       msgbuf << "Error \"" << message << "\" when executing query \"" << query << "\"";
+                       Log(LogCritical, "IdoMysqlConnection", msgbuf.str());
+
+                       BOOST_THROW_EXCEPTION(
+                           database_error()
+                               << errinfo_message(mysql_error(&m_Connection))
+                               << errinfo_database_query(query)
+                       );
+               }
+
+               for (std::vector<IdoAsyncQuery>::size_type i = offset; i < offset + count; i++) {
+                       const IdoAsyncQuery& aq = queries[i];
+
+                       m_AffectedRows = mysql_affected_rows(&m_Connection);
+
+                       MYSQL_RES *result = mysql_use_result(&m_Connection);
+
+                       IdoMysqlResult iresult;
+
+                       if (!result) {
+                               if (mysql_field_count(&m_Connection) > 0) {
+                                       std::ostringstream msgbuf;
+                                       String message = mysql_error(&m_Connection);
+                                       msgbuf << "Error \"" << message << "\" when executing query \"" << aq.Query << "\"";
+                                       Log(LogCritical, "IdoMysqlConnection", msgbuf.str());
+
+                                       BOOST_THROW_EXCEPTION(
+                                           database_error()
+                                               << errinfo_message(mysql_error(&m_Connection))
+                                               << errinfo_database_query(query)
+                                       );
+                               }
+                       } else
+                               iresult = IdoMysqlResult(result, std::ptr_fun(mysql_free_result));
+
+                       if (aq.Callback)
+                               aq.Callback(iresult);
+
+                       if (mysql_next_result(&m_Connection) > 0) {
+                               std::ostringstream msgbuf;
+                               String message = mysql_error(&m_Connection);
+                               msgbuf << "Error \"" << message << "\" when executing query \"" << query << "\"";
+                               Log(LogCritical, "IdoMysqlConnection", msgbuf.str());
+
+                               BOOST_THROW_EXCEPTION(
+                                   database_error()
+                                       << errinfo_message(mysql_error(&m_Connection))
+                                       << errinfo_database_query(query)
+                               );
+                       }
+               }
+
+               offset += count;
+       }
+}
+
 IdoMysqlResult IdoMysqlConnection::Query(const String& query)
 {
        AssertOnWorkQueue();
 
+       /* finish all async queries to maintain the right order for queries */
+       FinishAsyncQueries(true);
+
        Log(LogDebug, "IdoMysqlConnection")
            << "Query: " << query;
 
@@ -483,12 +596,13 @@ void IdoMysqlConnection::DiscardRows(const IdoMysqlResult& result)
 
 void IdoMysqlConnection::ActivateObject(const DbObject::Ptr& dbobj)
 {
-       boost::mutex::scoped_lock lock(m_ConnectionMutex);
-       InternalActivateObject(dbobj);
+       m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalActivateObject, this, dbobj));
 }
 
 void IdoMysqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj)
 {
+       AssertOnWorkQueue();
+
        if (!GetConnected())
                return;
 
@@ -510,13 +624,18 @@ void IdoMysqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj)
                SetObjectID(dbobj, GetLastInsertID());
        } else {
                qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 1 WHERE object_id = " << static_cast<long>(dbref);
-               Query(qbuf.str());
+               AsyncQuery(qbuf.str());
        }
 }
 
 void IdoMysqlConnection::DeactivateObject(const DbObject::Ptr& dbobj)
 {
-       boost::mutex::scoped_lock lock(m_ConnectionMutex);
+       m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalDeactivateObject, this, dbobj));
+}
+
+void IdoMysqlConnection::InternalDeactivateObject(const DbObject::Ptr& dbobj)
+{
+       AssertOnWorkQueue();
 
        if (!GetConnected())
                return;
@@ -528,13 +647,12 @@ void IdoMysqlConnection::DeactivateObject(const DbObject::Ptr& dbobj)
 
        std::ostringstream qbuf;
        qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 0 WHERE object_id = " << static_cast<long>(dbref);
-       Query(qbuf.str());
+       AsyncQuery(qbuf.str());
 
        /* Note that we're _NOT_ clearing the db refs via SetReference/SetConfigUpdate/SetStatusUpdate
         * because the object is still in the database. */
 }
 
-/* caller must hold m_ConnectionMutex */
 bool IdoMysqlConnection::FieldToEscapedString(const String& key, const Value& value, Value *result)
 {
        if (key == "instance_id") {
@@ -606,7 +724,7 @@ void IdoMysqlConnection::ExecuteQuery(const DbQuery& query)
 
 void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType *typeOverride)
 {
-       boost::mutex::scoped_lock lock(m_ConnectionMutex);
+       AssertOnWorkQueue();
 
        if ((query.Category & GetCategories()) == 0)
                return;
@@ -716,11 +834,12 @@ void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType
        if (type != DbQueryInsert)
                qbuf << where.str();
 
-       Query(qbuf.str());
+       AsyncQuery(qbuf.str(), boost::bind(&IdoMysqlConnection::FinishExecuteQuery, this, query, type, upsert));
+}
 
+void IdoMysqlConnection::FinishExecuteQuery(const DbQuery& query, int type, bool upsert)
+{
        if (upsert && GetAffectedRows() == 0) {
-               lock.unlock();
-
                DbQueryType to = DbQueryInsert;
                InternalExecuteQuery(query, &to);
 
@@ -749,12 +868,12 @@ void IdoMysqlConnection::CleanUpExecuteQuery(const String& table, const String&
 
 void IdoMysqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_column, double max_age)
 {
-       boost::mutex::scoped_lock lock(m_ConnectionMutex);
+       AssertOnWorkQueue();
 
        if (!GetConnected())
                return;
 
-       Query("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " +
+       AsyncQuery("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " +
            Convert::ToString(static_cast<long>(m_InstanceID)) + " AND " + time_column +
            " < FROM_UNIXTIME(" + Convert::ToString(static_cast<long>(max_age)) + ")");
 }
index 623fe465b28eeb6501047194899aa73521919e84..41cae1fd467af2f1684cb7093677cf50941f3746 100644 (file)
@@ -31,6 +31,14 @@ namespace icinga
 
 typedef boost::shared_ptr<MYSQL_RES> IdoMysqlResult;
 
+typedef boost::function<void (const IdoMysqlResult&)> IdoAsyncCallback;
+
+struct IdoAsyncQuery
+{
+       String Query;
+       IdoAsyncCallback Callback;
+};
+
 /**
  * An IDO MySQL database connection.
  *
@@ -64,9 +72,11 @@ private:
 
        WorkQueue m_QueryQueue;
 
-       boost::mutex m_ConnectionMutex;
        MYSQL m_Connection;
        int m_AffectedRows;
+       int m_MaxPacketSize;
+
+       std::vector<IdoAsyncQuery> m_AsyncQueries;
 
        Timer::Ptr m_ReconnectTimer;
        Timer::Ptr m_TxTimer;
@@ -78,8 +88,12 @@ private:
        Dictionary::Ptr FetchRow(const IdoMysqlResult& result);
        void DiscardRows(const IdoMysqlResult& result);
 
+       void AsyncQuery(const String& query, const IdoAsyncCallback& callback = IdoAsyncCallback());
+       void FinishAsyncQueries(bool force = false);
+
        bool FieldToEscapedString(const String& key, const Value& value, Value *result);
        void InternalActivateObject(const DbObject::Ptr& dbobj);
+       void InternalDeactivateObject(const DbObject::Ptr& dbobj);
 
        void Disconnect(void);
        void Reconnect(void);
@@ -90,6 +104,7 @@ private:
        void ReconnectTimerHandler(void);
 
        void InternalExecuteQuery(const DbQuery& query, DbQueryType *typeOverride = NULL);
+       void FinishExecuteQuery(const DbQuery& query, int type, bool upsert);
        void InternalCleanUpExecuteQuery(const String& table, const String& time_key, double time_value);
        void InternalNewTransaction(void);