if (!mysql_init(&m_Connection))
BOOST_THROW_EXCEPTION(std::bad_alloc());
- if (!mysql_real_connect(&m_Connection, host, user, passwd, db, port, NULL, 0))
+ if (!mysql_real_connect(&m_Connection, host, user, passwd, db, port, NULL, CLIENT_FOUND_ROWS))
BOOST_THROW_EXCEPTION(std::runtime_error(mysql_error(&m_Connection)));
m_Connected = true;
+ "', '" + (reconnect ? "RECONNECT" : "INITIAL") + "', NOW())");
/* clear config tables for the initial config dump */
- ClearConfigTables();
+ PrepareDatabase();
std::ostringstream q1buf;
q1buf << "SELECT object_id, objecttype_id, name1, name2, is_active FROM " + GetTablePrefix() + "objects WHERE instance_id = " << static_cast<long>(m_InstanceID);
<< errinfo_database_query(query)
);
+ m_AffectedRows = mysql_affected_rows(&m_Connection);
+
MYSQL_RES *result = mysql_use_result(&m_Connection);
if (!result) {
return DbReference(mysql_insert_id(&m_Connection));
}
+int IdoMysqlConnection::GetAffectedRows(void)
+{
+ AssertOnWorkQueue();
+
+ return m_AffectedRows;
+}
+
String IdoMysqlConnection::Escape(const String& s)
{
AssertOnWorkQueue();
{
ASSERT(query.Category != DbCatInvalid);
- m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query), true);
+ m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), true);
}
-void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query)
+void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType *typeOverride)
{
boost::mutex::scoped_lock lock(m_ConnectionMutex);
}
}
- if ((query.Type & DbQueryInsert) && (query.Type & DbQueryUpdate)) {
+ type = typeOverride ? *typeOverride : query.Type;
+
+ bool upsert = false;
+
+ if ((type & DbQueryInsert) && (type & DbQueryUpdate)) {
bool hasid = false;
ASSERT(query.Object);
else
ASSERT(!"Invalid query flags.");
- if (hasid)
- type = DbQueryUpdate;
- else
- type = DbQueryInsert;
- } else
- type = query.Type;
+ if (!hasid)
+ upsert = true;
+
+ type = DbQueryUpdate;
+ }
switch (type) {
case DbQueryInsert:
Query(qbuf.str());
+ if (upsert && GetAffectedRows() == 0) {
+ lock.unlock();
+
+ DbQueryType to = DbQueryInsert;
+ InternalExecuteQuery(query, &to);
+
+ return;
+ }
+
if (query.Object) {
if (query.ConfigUpdate)
SetConfigUpdate(query.Object, true);
Convert::ToString(static_cast<long>(m_InstanceID)) + " AND " + time_column +
" < FROM_UNIXTIME(" + Convert::ToString(static_cast<long>(max_age)) + ")");
}
+
+void IdoMysqlConnection::FillIDCache(const DbType::Ptr& type)
+{
+ String query = "SELECT " + type->GetIDColumn() + " AS object_id, " + type->GetTable() + "_id FROM " + GetTablePrefix() + type->GetTable() + "s";
+ IdoMysqlResult result = Query(query);
+
+ Dictionary::Ptr row;
+
+ while ((row = FetchRow(result))) {
+ SetInsertID(type, DbReference(row->Get("object_id")), DbReference(row->Get(type->GetTable() + "_id")));
+ }
+}
virtual void DeactivateObject(const DbObject::Ptr& dbobj);
virtual void ExecuteQuery(const DbQuery& query);
virtual void CleanUpExecuteQuery(const String& table, const String& time_key, double time_value);
+ virtual void FillIDCache(const DbType::Ptr& type);
private:
DbReference m_InstanceID;
boost::mutex m_ConnectionMutex;
bool m_Connected;
MYSQL m_Connection;
+ int m_AffectedRows;
Timer::Ptr m_ReconnectTimer;
Timer::Ptr m_TxTimer;
IdoMysqlResult Query(const String& query);
DbReference GetLastInsertID(void);
+ int GetAffectedRows(void);
String Escape(const String& s);
Dictionary::Ptr FetchRow(const IdoMysqlResult& result);
void DiscardRows(const IdoMysqlResult& result);
void TxTimerHandler(void);
void ReconnectTimerHandler(void);
- void InternalExecuteQuery(const DbQuery& query);
+ void InternalExecuteQuery(const DbQuery& query, DbQueryType *typeOverride = NULL);
void InternalCleanUpExecuteQuery(const String& table, const String& time_key, double time_value);
virtual void ClearConfigTable(const String& table);
+ "', '" + (reconnect ? "RECONNECT" : "INITIAL") + "', NOW())");
/* clear config tables for the initial config dump */
- ClearConfigTables();
+ PrepareDatabase();
std::ostringstream q1buf;
q1buf << "SELECT object_id, objecttype_id, name1, name2, is_active FROM " + GetTablePrefix() + "objects WHERE instance_id = " << static_cast<long>(m_InstanceID);
);
}
+ char *rowCount = PQcmdTuples(result);
+ m_AffectedRows = atoi(rowCount);
+
return IdoPgsqlResult(result, std::ptr_fun(PQclear));
}
return DbReference(Convert::ToLong(row->Get("id")));
}
+int IdoPgsqlConnection::GetAffectedRows(void)
+{
+ AssertOnWorkQueue();
+
+ return m_AffectedRows;
+}
+
String IdoPgsqlConnection::Escape(const String& s)
{
AssertOnWorkQueue();
{
ASSERT(query.Category != DbCatInvalid);
- m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query), true);
+ m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), true);
}
-void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query)
+void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType *typeOverride)
{
boost::mutex::scoped_lock lock(m_ConnectionMutex);
}
}
+ type = typeOverride ? *typeOverride : query.Type;
+
+ bool upsert = false;
+
if ((query.Type & DbQueryInsert) && (query.Type & DbQueryUpdate)) {
bool hasid = false;
else
ASSERT(!"Invalid query flags.");
- if (hasid)
- type = DbQueryUpdate;
- else
- type = DbQueryInsert;
- } else
- type = query.Type;
+ if (!hasid)
+ upsert = true;
+
+ type = DbQueryUpdate;
+ }
switch (type) {
case DbQueryInsert:
Query(qbuf.str());
+ if (upsert && GetAffectedRows() == 0) {
+ lock.unlock();
+
+ DbQueryType to = DbQueryInsert;
+ InternalExecuteQuery(query, &to);
+
+ return;
+ }
+
if (query.Object) {
if (query.ConfigUpdate)
SetConfigUpdate(query.Object, true);
Convert::ToString(static_cast<long>(m_InstanceID)) + " AND " + time_column +
" < TO_TIMESTAMP(" + Convert::ToString(static_cast<long>(max_age)) + ")");
}
+
+void IdoPgsqlConnection::FillIDCache(const DbType::Ptr& type)
+{
+ String query = "SELECT " + type->GetIDColumn() + " AS object_id, " + type->GetTable() + "_id FROM " + GetTablePrefix() + type->GetTable() + "s";
+ IdoPgsqlResult result = Query(query);
+
+ Dictionary::Ptr row;
+
+ int index = 0;
+ while ((row = FetchRow(result, index))) {
+ index++;
+ SetInsertID(type, DbReference(row->Get("object_id")), DbReference(row->Get(type->GetTable() + "_id")));
+ }
+}
virtual void DeactivateObject(const DbObject::Ptr& dbobj);
virtual void ExecuteQuery(const DbQuery& query);
virtual void CleanUpExecuteQuery(const String& table, const String& time_key, double time_value);
+ virtual void FillIDCache(const DbType::Ptr& type);
private:
DbReference m_InstanceID;
boost::mutex m_ConnectionMutex;
PGconn *m_Connection;
+ int m_AffectedRows;
Timer::Ptr m_ReconnectTimer;
Timer::Ptr m_TxTimer;
IdoPgsqlResult Query(const String& query);
DbReference GetSequenceValue(const String& table, const String& column);
+ int GetAffectedRows(void);
String Escape(const String& s);
Dictionary::Ptr FetchRow(const IdoPgsqlResult& result, int row);
void TxTimerHandler(void);
void ReconnectTimerHandler(void);
- void InternalExecuteQuery(const DbQuery& query);
+ void InternalExecuteQuery(const DbQuery& query, DbQueryType *typeOverride = NULL);
void InternalCleanUpExecuteQuery(const String& table, const String& time_key, double time_value);
virtual void ClearConfigTable(const String& table);
void DbConnection::SetInsertID(const DbObject::Ptr& dbobj, const DbReference& dbref)
{
+ SetInsertID(dbobj->GetType(), GetObjectID(dbobj), dbref);
+}
+
+void DbConnection::SetInsertID(const DbType::Ptr& type, const DbReference& objid, const DbReference& dbref)
+{
+ if (!objid.IsValid())
+ return;
+
if (dbref.IsValid())
- m_InsertIDs[dbobj] = dbref;
+ m_InsertIDs[std::make_pair(type, objid)] = dbref;
else
- m_InsertIDs.erase(dbobj);
+ m_InsertIDs.erase(std::make_pair(type, objid));
}
DbReference DbConnection::GetInsertID(const DbObject::Ptr& dbobj) const
{
- std::map<DbObject::Ptr, DbReference>::const_iterator it;
+ return GetInsertID(dbobj->GetType(), GetObjectID(dbobj));
+}
- it = m_InsertIDs.find(dbobj);
+DbReference DbConnection::GetInsertID(const DbType::Ptr& type, const DbReference& objid) const
+{
+ if (!objid.IsValid())
+ return DbReference();
+
+ std::map<std::pair<DbType::Ptr, DbReference>, DbReference>::const_iterator it;
+
+ it = m_InsertIDs.find(std::make_pair(type, objid));
if (it == m_InsertIDs.end())
return DbReference();
}
}
-void DbConnection::ClearConfigTables(void)
+void DbConnection::PrepareDatabase(void)
{
/* TODO make hardcoded table names modular */
- ClearConfigTable("commands");
+ //ClearConfigTable("commands");
ClearConfigTable("comments");
ClearConfigTable("contact_addresses");
ClearConfigTable("contact_notificationcommands");
ClearConfigTable("contactgroup_members");
- ClearConfigTable("contactgroups");
- ClearConfigTable("contacts");
- ClearConfigTable("contactstatus");
+ //ClearConfigTable("contactgroups");
+ //ClearConfigTable("contacts");
+ //ClearConfigTable("contactstatus");
ClearConfigTable("customvariables");
ClearConfigTable("customvariablestatus");
ClearConfigTable("host_contactgroups");
ClearConfigTable("host_parenthosts");
ClearConfigTable("hostdependencies");
ClearConfigTable("hostgroup_members");
- ClearConfigTable("hostgroups");
- ClearConfigTable("hosts");
- ClearConfigTable("hoststatus");
+ //ClearConfigTable("hostgroups");
+ //ClearConfigTable("hosts");
+ //ClearConfigTable("hoststatus");
ClearConfigTable("programstatus");
ClearConfigTable("scheduleddowntime");
- ClearConfigTable("service_contactgroups");
- ClearConfigTable("service_contacts");
- ClearConfigTable("servicedependencies");
- ClearConfigTable("servicegroup_members");
- ClearConfigTable("servicegroups");
- ClearConfigTable("services");
- ClearConfigTable("servicestatus");
- ClearConfigTable("timeperiod_timeranges");
- ClearConfigTable("timeperiods");
+ //ClearConfigTable("service_contactgroups");
+ //ClearConfigTable("service_contacts");
+ //ClearConfigTable("servicedependencies");
+ //ClearConfigTable("servicegroup_members");
+ //ClearConfigTable("servicegroups");
+ //ClearConfigTable("services");
+ //ClearConfigTable("servicestatus");
+ //ClearConfigTable("timeperiod_timeranges");
+ //ClearConfigTable("timeperiods");
+
+ BOOST_FOREACH(const DbType::Ptr& type, DbType::GetAllTypes()) {
+ FillIDCache(type);
+ }
}
DbReference GetObjectID(const DbObject::Ptr& dbobj) const;
void SetInsertID(const DbObject::Ptr& dbobj, const DbReference& dbref);
+ void SetInsertID(const DbType::Ptr& type, const DbReference& objid, const DbReference& dbref);
DbReference GetInsertID(const DbObject::Ptr& dbobj) const;
+ DbReference GetInsertID(const DbType::Ptr& type, const DbReference& objid) const;
void SetNotificationInsertID(const DynamicObject::Ptr& obj, const DbReference& dbref);
DbReference GetNotificationInsertID(const DynamicObject::Ptr& obj) const;
virtual void DeactivateObject(const DbObject::Ptr& dbobj) = 0;
virtual void CleanUpExecuteQuery(const String& table, const String& time_column, double max_age);
+ virtual void FillIDCache(const DbType::Ptr& type) = 0;
void UpdateAllObjects(void);
- void ClearConfigTables(void);
+ void PrepareDatabase(void);
private:
std::map<DbObject::Ptr, DbReference> m_ObjectIDs;
- std::map<DbObject::Ptr, DbReference> m_InsertIDs;
+ std::map<std::pair<DbType::Ptr, DbReference>, DbReference> m_InsertIDs;
std::map<DynamicObject::Ptr, DbReference> m_NotificationInsertIDs;
std::set<DbObject::Ptr> m_ActiveObjects;
std::set<DbObject::Ptr> m_ConfigUpdates;
static DbType::TypeMap tm;
return tm;
}
+
+std::set<DbType::Ptr> DbType::GetAllTypes(void)
+{
+ std::set<DbType::Ptr> result;
+
+ boost::mutex::scoped_lock lock(GetStaticMutex());
+ std::pair<String, DbType::Ptr> kv;
+ BOOST_FOREACH(kv, GetTypes()) {
+ result.insert(kv.second);
+ }
+
+ return result;
+}
shared_ptr<DbObject> GetOrCreateObjectByName(const String& name1, const String& name2);
+ static std::set<DbType::Ptr> GetAllTypes(void);
+
private:
std::vector<String> m_Names;
String m_Table;