REGISTER_STATSFUNCTION(IdoPgsqlConnectionStats, &IdoPgsqlConnection::StatsFunc);
IdoPgsqlConnection::IdoPgsqlConnection(void)
- : m_QueryQueue(500000), m_Connection(NULL)
+ : m_QueryQueue(500000)
{ }
void IdoPgsqlConnection::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
DbConnection::Resume();
SetConnected(false);
- m_Connection = NULL;
m_QueryQueue.SetExceptionCallback(boost::bind(&IdoPgsqlConnection::ExceptionHandler, this, _1));
Log(LogDebug, "IdoPgsqlConnection")
<< "Exception during database operation: " << DiagnosticInformation(exp);
- boost::mutex::scoped_lock lock(m_ConnectionMutex);
-
- if (m_Connection) {
+ if (GetConnected()) {
PQfinish(m_Connection);
SetConnected(false);
- m_Connection = NULL;
}
}
{
AssertOnWorkQueue();
- boost::mutex::scoped_lock lock(m_ConnectionMutex);
-
- if (!m_Connection)
+ if (!GetConnected())
return;
Query("COMMIT");
- PQfinish(m_Connection);
+ PQfinish(m_Connection);
SetConnected(false);
- m_Connection = NULL;
}
void IdoPgsqlConnection::TxTimerHandler(void)
void IdoPgsqlConnection::InternalNewTransaction(void)
{
- boost::mutex::scoped_lock lock(m_ConnectionMutex);
+ AssertOnWorkQueue();
- if (!m_Connection)
+ if (!GetConnected())
return;
Query("COMMIT");
CONTEXT("Reconnecting to PostgreSQL IDO database '" + GetName() + "'");
+ SetShouldConnect(true);
+
std::vector<DbObject::Ptr> active_dbobjs;
{
- boost::mutex::scoped_lock lock(m_ConnectionMutex);
bool reconnect = false;
- if (m_Connection) {
+ if (GetConnected()) {
/* Check if we're really still connected */
try {
Query("SELECT 1");
return;
} catch (const std::exception&) {
- SetConnected(false);
PQfinish(m_Connection);
- m_Connection = NULL;
+ SetConnected(false);
reconnect = true;
}
}
if (!m_Connection)
return;
- SetConnected(true);
-
if (PQstatus(m_Connection) != CONNECTION_OK) {
String message = PQerrorMessage(m_Connection);
PQfinish(m_Connection);
SetConnected(false);
- m_Connection = NULL;
Log(LogCritical, "IdoPgsqlConnection")
<< "Connection to database '" << db << "' with user '" << user << "' on '" << host << ":" << port
BOOST_THROW_EXCEPTION(std::runtime_error(message));
}
+ SetConnected(true);
+
String dbVersionName = "idoutils";
IdoPgsqlResult result = Query("SELECT version FROM " + GetTablePrefix() + "dbversion WHERE name=E'" + Escape(dbVersionName) + "'");
if (!row) {
PQfinish(m_Connection);
SetConnected(false);
- m_Connection = NULL;
Log(LogCritical, "IdoPgsqlConnection", "Schema does not provide any valid version! Verify your schema installation.");
if (Utility::CompareVersion(IDO_COMPAT_SCHEMA_VERSION, version) < 0) {
PQfinish(m_Connection);
SetConnected(false);
- m_Connection = NULL;
Log(LogCritical, "IdoPgsqlConnection")
<< "Schema version '" << version << "' does not match the required version '"
if (status_update_age < GetFailoverTimeout()) {
PQfinish(m_Connection);
SetConnected(false);
- m_Connection = NULL;
+ SetShouldConnect(false);
return;
}
PQfinish(m_Connection);
SetConnected(false);
- m_Connection = NULL;
return;
}
void IdoPgsqlConnection::ActivateObject(const DbObject::Ptr& dbobj)
{
- boost::mutex::scoped_lock lock(m_ConnectionMutex);
- InternalActivateObject(dbobj);
+ m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalActivateObject, this, dbobj));
}
void IdoPgsqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj)
{
- if (!m_Connection)
+ AssertOnWorkQueue();
+
+ if (!GetConnected())
return;
DbReference dbref = GetObjectID(dbobj);
void IdoPgsqlConnection::DeactivateObject(const DbObject::Ptr& dbobj)
{
- boost::mutex::scoped_lock lock(m_ConnectionMutex);
+ m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalDeactivateObject, this, dbobj));
+}
+
+void IdoPgsqlConnection::InternalDeactivateObject(const DbObject::Ptr& dbobj)
+{
+ AssertOnWorkQueue();
- if (!m_Connection)
+ if (!GetConnected())
return;
DbReference dbref = GetObjectID(dbobj);
* because the object is still in the database. */
}
-/* caller must hold m_ConnectionMutex */
bool IdoPgsqlConnection::FieldToEscapedString(const String& key, const Value& value, Value *result)
{
if (key == "instance_id") {
void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType *typeOverride)
{
- boost::mutex::scoped_lock lock(m_ConnectionMutex);
+ AssertOnWorkQueue();
if ((query.Category & GetCategories()) == 0)
return;
- if (!m_Connection)
+ if (!GetConnected())
return;
if (query.Object && query.Object->GetObject()->GetExtension("agent_check").ToBool())
Query(qbuf.str());
if (upsert && GetAffectedRows() == 0) {
- lock.unlock();
-
DbQueryType to = DbQueryInsert;
InternalExecuteQuery(query, &to);
void IdoPgsqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_column, double max_age)
{
- boost::mutex::scoped_lock lock(m_ConnectionMutex);
+ AssertOnWorkQueue();
- if (!m_Connection)
+ if (!GetConnected())
return;
Query("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " +