From 08303f9aca33b09474f0aa1061c065d8b3e083b7 Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Tue, 28 Jan 2014 11:42:12 +0100 Subject: [PATCH] Incrementally fetch MySQL/PGsql rows. Fixes #5374 --- .../db_ido_mysql/idomysqlconnection.cpp | 60 +++++++++--------- components/db_ido_mysql/idomysqlconnection.h | 7 ++- .../db_ido_pgsql/idopgsqlconnection.cpp | 63 ++++++++----------- components/db_ido_pgsql/idopgsqlconnection.h | 6 +- 4 files changed, 64 insertions(+), 72 deletions(-) diff --git a/components/db_ido_mysql/idomysqlconnection.cpp b/components/db_ido_mysql/idomysqlconnection.cpp index 8ef2b22d7..8682087d8 100644 --- a/components/db_ido_mysql/idomysqlconnection.cpp +++ b/components/db_ido_mysql/idomysqlconnection.cpp @@ -165,12 +165,15 @@ void IdoMysqlConnection::Reconnect(void) m_Connected = true; String dbVersionName = "idoutils"; - Array::Ptr version_rows = Query("SELECT version FROM " + GetTablePrefix() + "dbversion WHERE name='" + Escape(dbVersionName) + "'"); + IdoMysqlResult result = Query("SELECT version FROM " + GetTablePrefix() + "dbversion WHERE name='" + Escape(dbVersionName) + "'"); - if (version_rows->GetLength() == 0) + Dictionary::Ptr version_row = FetchRow(result); + + if (!version_row) BOOST_THROW_EXCEPTION(std::runtime_error("Schema does not provide any valid version! Verify your schema installation.")); - Dictionary::Ptr version_row = version_rows->Get(0); + DiscardRows(result); + String version = version_row->Get("version"); if (Utility::CompareVersion(SCHEMA_VERSION, version) < 0) { @@ -180,13 +183,16 @@ void IdoMysqlConnection::Reconnect(void) String instanceName = GetInstanceName(); - Array::Ptr rows = Query("SELECT instance_id FROM " + GetTablePrefix() + "instances WHERE instance_name = '" + Escape(instanceName) + "'"); + result = Query("SELECT instance_id FROM " + GetTablePrefix() + "instances WHERE instance_name = '" + Escape(instanceName) + "'"); - if (rows->GetLength() == 0) { + Dictionary::Ptr row = FetchRow(result); + + if (!row) { Query("INSERT INTO " + GetTablePrefix() + "instances (instance_name, instance_description) VALUES ('" + Escape(instanceName) + "', '" + Escape(GetInstanceDescription()) + "')"); m_InstanceID = GetLastInsertID(); } else { - Dictionary::Ptr row = rows->Get(0); + DiscardRows(result); + m_InstanceID = DbReference(row->Get("instance_id")); } @@ -208,10 +214,9 @@ void IdoMysqlConnection::Reconnect(void) std::ostringstream q1buf; q1buf << "SELECT object_id, objecttype_id, name1, name2, is_active FROM " + GetTablePrefix() + "objects WHERE instance_id = " << static_cast(m_InstanceID); - rows = Query(q1buf.str()); + result = Query(q1buf.str()); - ObjectLock olock(rows); - BOOST_FOREACH(const Dictionary::Ptr& row, rows) { + while ((row = FetchRow(result))) { DbType::Ptr dbtype = DbType::GetByID(row->Get("objecttype_id")); if (!dbtype) @@ -267,7 +272,7 @@ void IdoMysqlConnection::ClearConfigTable(const String& table) Query("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " + Convert::ToString(static_cast(m_InstanceID))); } -Array::Ptr IdoMysqlConnection::Query(const String& query) +IdoMysqlResult IdoMysqlConnection::Query(const String& query) { AssertOnWorkQueue(); @@ -290,23 +295,10 @@ Array::Ptr IdoMysqlConnection::Query(const String& query) << errinfo_database_query(query) ); - return Array::Ptr(); + return IdoMysqlResult(); } - Array::Ptr rows = make_shared(); - - for (;;) { - Dictionary::Ptr row = FetchRow(result); - - if (!row) - break; - - rows->Add(row); - } - - mysql_free_result(result); - - return rows; + return IdoMysqlResult(result, std::ptr_fun(mysql_free_result)); } DbReference IdoMysqlConnection::GetLastInsertID(void) @@ -332,7 +324,7 @@ String IdoMysqlConnection::Escape(const String& s) return result; } -Dictionary::Ptr IdoMysqlConnection::FetchRow(MYSQL_RES *result) +Dictionary::Ptr IdoMysqlConnection::FetchRow(const IdoMysqlResult& result) { AssertOnWorkQueue(); @@ -340,25 +332,33 @@ Dictionary::Ptr IdoMysqlConnection::FetchRow(MYSQL_RES *result) MYSQL_FIELD *field; unsigned long *lengths, i; - row = mysql_fetch_row(result); + row = mysql_fetch_row(result.get()); if (!row) return Dictionary::Ptr(); - lengths = mysql_fetch_lengths(result); + lengths = mysql_fetch_lengths(result.get()); if (!lengths) return Dictionary::Ptr(); Dictionary::Ptr dict = make_shared(); - mysql_field_seek(result, 0); - for (field = mysql_fetch_field(result), i = 0; field; field = mysql_fetch_field(result), i++) + mysql_field_seek(result.get(), 0); + for (field = mysql_fetch_field(result.get()), i = 0; field; field = mysql_fetch_field(result.get()), i++) dict->Set(field->name, String(row[i], row[i] + lengths[i])); return dict; } +void IdoMysqlConnection::DiscardRows(const IdoMysqlResult& result) +{ + Dictionary::Ptr row; + + while ((row = FetchRow(result))) + ; /* empty loop body */ +} + void IdoMysqlConnection::ActivateObject(const DbObject::Ptr& dbobj) { boost::mutex::scoped_lock lock(m_ConnectionMutex); diff --git a/components/db_ido_mysql/idomysqlconnection.h b/components/db_ido_mysql/idomysqlconnection.h index 63de1c406..d85a17407 100644 --- a/components/db_ido_mysql/idomysqlconnection.h +++ b/components/db_ido_mysql/idomysqlconnection.h @@ -29,6 +29,8 @@ namespace icinga { +typedef shared_ptr IdoMysqlResult; + /** * An IDO MySQL database connection. * @@ -61,10 +63,11 @@ private: Timer::Ptr m_ReconnectTimer; Timer::Ptr m_TxTimer; - Array::Ptr Query(const String& query); + IdoMysqlResult Query(const String& query); DbReference GetLastInsertID(void); String Escape(const String& s); - Dictionary::Ptr FetchRow(MYSQL_RES *result); + Dictionary::Ptr FetchRow(const IdoMysqlResult& result); + void DiscardRows(const IdoMysqlResult& result); bool FieldToEscapedString(const String& key, const Value& value, Value *result); void InternalActivateObject(const DbObject::Ptr& dbobj); diff --git a/components/db_ido_pgsql/idopgsqlconnection.cpp b/components/db_ido_pgsql/idopgsqlconnection.cpp index ecab08e14..67b57f53a 100644 --- a/components/db_ido_pgsql/idopgsqlconnection.cpp +++ b/components/db_ido_pgsql/idopgsqlconnection.cpp @@ -172,12 +172,13 @@ void IdoPgsqlConnection::Reconnect(void) } String dbVersionName = "idoutils"; - Array::Ptr version_rows = Query("SELECT version FROM " + GetTablePrefix() + "dbversion WHERE name='" + Escape(dbVersionName) + "'"); + IdoPgsqlResult result = Query("SELECT version FROM " + GetTablePrefix() + "dbversion WHERE name='" + Escape(dbVersionName) + "'"); - if (version_rows->GetLength() == 0) + Dictionary::Ptr version_row = FetchRow(result, 0); + + if (!version_row) BOOST_THROW_EXCEPTION(std::runtime_error("Schema does not provide any valid version! Verify your schema installation.")); - Dictionary::Ptr version_row = version_rows->Get(0); String version = version_row->Get("version"); if (Utility::CompareVersion(SCHEMA_VERSION, version) < 0) { @@ -187,13 +188,14 @@ void IdoPgsqlConnection::Reconnect(void) String instanceName = GetInstanceName(); - Array::Ptr rows = Query("SELECT instance_id FROM " + GetTablePrefix() + "instances WHERE instance_name = '" + Escape(instanceName) + "'"); + result = Query("SELECT instance_id FROM " + GetTablePrefix() + "instances WHERE instance_name = '" + Escape(instanceName) + "'"); + + Dictionary::Ptr row = FetchRow(result, 0); - if (rows->GetLength() == 0) { + if (!row) { Query("INSERT INTO " + GetTablePrefix() + "instances (instance_name, instance_description) VALUES ('" + Escape(instanceName) + "', '" + Escape(GetInstanceDescription()) + "')"); m_InstanceID = GetSequenceValue(GetTablePrefix() + "instances", "instance_id"); } else { - Dictionary::Ptr row = rows->Get(0); m_InstanceID = DbReference(row->Get("instance_id")); } @@ -212,10 +214,12 @@ void IdoPgsqlConnection::Reconnect(void) std::ostringstream q1buf; q1buf << "SELECT object_id, objecttype_id, name1, name2, is_active FROM " + GetTablePrefix() + "objects WHERE instance_id = " << static_cast(m_InstanceID); - rows = Query(q1buf.str()); + result = Query(q1buf.str()); + + int index = 0; + while ((row = FetchRow(result, index))) { + index++; - ObjectLock olock(rows); - BOOST_FOREACH(const Dictionary::Ptr& row, rows) { DbType::Ptr dbtype = DbType::GetByID(row->Get("objecttype_id")); if (!dbtype) @@ -271,7 +275,7 @@ void IdoPgsqlConnection::ClearConfigTable(const String& table) Query("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " + Convert::ToString(static_cast(m_InstanceID))); } -Array::Ptr IdoPgsqlConnection::Query(const String& query) +IdoPgsqlResult IdoPgsqlConnection::Query(const String& query) { AssertOnWorkQueue(); @@ -286,7 +290,7 @@ Array::Ptr IdoPgsqlConnection::Query(const String& query) ); if (PQresultStatus(result) == PGRES_COMMAND_OK) - return Array::Ptr(); + return IdoPgsqlResult(); if (PQresultStatus(result) != PGRES_TUPLES_OK) { String message = PQresultErrorMessage(result); @@ -299,35 +303,18 @@ Array::Ptr IdoPgsqlConnection::Query(const String& query) ); } - Array::Ptr rows = make_shared(); - - int rownum = 0; - - for (;;) { - Dictionary::Ptr row = FetchRow(result, rownum); - - if (!row) - break; - - rows->Add(row); - - rownum++; - } - - PQclear(result); - - return rows; + return IdoPgsqlResult(result, std::ptr_fun(PQclear)); } DbReference IdoPgsqlConnection::GetSequenceValue(const String& table, const String& column) { AssertOnWorkQueue(); - Array::Ptr rows = Query("SELECT CURRVAL(pg_get_serial_sequence('" + Escape(table) + "', '" + Escape(column) + "')) AS id"); + IdoPgsqlResult result = Query("SELECT CURRVAL(pg_get_serial_sequence('" + Escape(table) + "', '" + Escape(column) + "')) AS id"); - ASSERT(rows->GetLength() == 1); + Dictionary::Ptr row = FetchRow(result, 0); - Dictionary::Ptr row = rows->Get(0); + ASSERT(row); std::ostringstream msgbuf; msgbuf << "Sequence Value: " << row->Get("id"); @@ -352,24 +339,24 @@ String IdoPgsqlConnection::Escape(const String& s) return result; } -Dictionary::Ptr IdoPgsqlConnection::FetchRow(PGresult *result, int row) +Dictionary::Ptr IdoPgsqlConnection::FetchRow(const IdoPgsqlResult& result, int row) { AssertOnWorkQueue(); - if (row >= PQntuples(result)) + if (row >= PQntuples(result.get())) return Dictionary::Ptr(); - int columns = PQnfields(result); + int columns = PQnfields(result.get()); Dictionary::Ptr dict = make_shared(); for (int column = 0; column < columns; column++) { Value value; - if (!PQgetisnull(result, row, column)) - value = PQgetvalue(result, row, column); + if (!PQgetisnull(result.get(), row, column)) + value = PQgetvalue(result.get(), row, column); - dict->Set(PQfname(result, column), value); + dict->Set(PQfname(result.get(), column), value); } return dict; diff --git a/components/db_ido_pgsql/idopgsqlconnection.h b/components/db_ido_pgsql/idopgsqlconnection.h index fb4af0610..0985e0271 100644 --- a/components/db_ido_pgsql/idopgsqlconnection.h +++ b/components/db_ido_pgsql/idopgsqlconnection.h @@ -29,6 +29,8 @@ namespace icinga { +typedef shared_ptr IdoPgsqlResult; + /** * An IDO pgSQL database connection. * @@ -60,10 +62,10 @@ private: Timer::Ptr m_ReconnectTimer; Timer::Ptr m_TxTimer; - Array::Ptr Query(const String& query); + IdoPgsqlResult Query(const String& query); DbReference GetSequenceValue(const String& table, const String& column); String Escape(const String& s); - Dictionary::Ptr FetchRow(PGresult *result, int row); + Dictionary::Ptr FetchRow(const IdoPgsqlResult& result, int row); bool FieldToEscapedString(const String& key, const Value& value, Value *result); void InternalActivateObject(const DbObject::Ptr& dbobj); -- 2.40.0