REGISTER_TYPE(MysqlDbConnection);
MysqlDbConnection::MysqlDbConnection(const Dictionary::Ptr& serializedUpdate)
- : DbConnection(serializedUpdate)
+ : DbConnection(serializedUpdate), m_Connected(false)
{
RegisterAttribute("host", Attribute_Config, &m_Host);
RegisterAttribute("port", Attribute_Config, &m_Port);
m_TxTimer->OnTimerExpired.connect(boost::bind(&MysqlDbConnection::TxTimerHandler, this));
m_TxTimer->Start();
- /* TODO: move this to a timer so we can periodically check if we're still connected - and reconnect if necessary */
+ m_ReconnectTimer = boost::make_shared<Timer>();
+ m_ReconnectTimer->SetInterval(10);
+ m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&MysqlDbConnection::ReconnectTimerHandler, this));
+ m_ReconnectTimer->Start();
+
+ ASSERT(mysql_thread_safe());
+}
+
+void MysqlDbConnection::Stop(void)
+{
+ boost::mutex::scoped_lock lock(m_ConnectionMutex);
+ mysql_close(&m_Connection);
+}
+
+void MysqlDbConnection::TxTimerHandler(void)
+{
+ boost::mutex::scoped_lock lock(m_ConnectionMutex);
+
+ if (!m_Connected)
+ return;
+
+ Query("COMMIT");
+ Query("BEGIN");
+}
+
+void MysqlDbConnection::ReconnectTimerHandler(void)
+{
+ boost::mutex::scoped_lock lock(m_ConnectionMutex);
+
+ if (m_Connected) {
+ /* Check if we're really still connected */
+ if (mysql_ping(&m_Connection) == 0)
+ return;
+
+ mysql_close(&m_Connection);
+ m_Connected = false;
+ }
+
String ihost, iuser, ipasswd, idb;
const char *host, *user , *passwd, *db;
long port;
passwd = (!ipasswd.IsEmpty()) ? ipasswd.CStr() : NULL;
db = (!idb.IsEmpty()) ? idb.CStr() : NULL;
- {
- boost::mutex::scoped_lock lock(m_ConnectionMutex);
+ if (!mysql_init(&m_Connection))
+ BOOST_THROW_EXCEPTION(std::bad_alloc());
- if (!mysql_init(&m_Connection))
- BOOST_THROW_EXCEPTION(std::bad_alloc());
+ if (!mysql_real_connect(&m_Connection, host, user, passwd, db, port, NULL, 0))
+ BOOST_THROW_EXCEPTION(std::runtime_error(mysql_error(&m_Connection)));
- if (!mysql_real_connect(&m_Connection, host, user, passwd, db, port, NULL, 0))
- BOOST_THROW_EXCEPTION(std::runtime_error(mysql_error(&m_Connection)));
+ m_Connected = true;
- String instanceName = "default";
+ String instanceName = "default";
- if (!m_InstanceName.IsEmpty())
- instanceName = m_InstanceName;
+ if (!m_InstanceName.IsEmpty())
+ instanceName = m_InstanceName;
- Array::Ptr rows = Query("SELECT instance_id FROM icinga_instances WHERE instance_name = '" + Escape(instanceName) + "'");
+ Array::Ptr rows = Query("SELECT instance_id FROM icinga_instances WHERE instance_name = '" + Escape(instanceName) + "'");
- if (rows->GetLength() == 0) {
- Query("INSERT INTO icinga_instances (instance_name, instance_description) VALUES ('" + Escape(instanceName) + "', '" + m_InstanceDescription + "')");
- m_InstanceID = GetInsertID();
- } else {
- Dictionary::Ptr row = rows->Get(0);
- m_InstanceID = DbReference(row->Get("instance_id"));
- }
+ if (rows->GetLength() == 0) {
+ Query("INSERT INTO icinga_instances (instance_name, instance_description) VALUES ('" + Escape(instanceName) + "', '" + m_InstanceDescription + "')");
+ m_InstanceID = GetInsertID();
+ } else {
+ Dictionary::Ptr row = rows->Get(0);
+ m_InstanceID = DbReference(row->Get("instance_id"));
+ }
- std::ostringstream msgbuf;
- msgbuf << "MySQL IDO instance id: " << static_cast<long>(m_InstanceID);
- Log(LogInformation, "ido_mysql", msgbuf.str());
+ std::ostringstream msgbuf;
+ msgbuf << "MySQL IDO instance id: " << static_cast<long>(m_InstanceID);
+ Log(LogInformation, "ido_mysql", msgbuf.str());
- Query("UPDATE icinga_objects SET is_active = 0");
+ Query("UPDATE icinga_objects SET is_active = 0");
- std::ostringstream qbuf;
- qbuf << "SELECT object_id, objecttype_id, name1, name2 FROM icinga_objects WHERE instance_id = " << static_cast<long>(m_InstanceID);
- rows = Query(qbuf.str());
+ std::ostringstream qbuf;
+ qbuf << "SELECT object_id, objecttype_id, name1, name2 FROM icinga_objects WHERE instance_id = " << static_cast<long>(m_InstanceID);
+ rows = Query(qbuf.str());
- ObjectLock olock(rows);
- BOOST_FOREACH(const Dictionary::Ptr& row, rows) {
- DbType::Ptr dbtype = DbType::GetByID(row->Get("objecttype_id"));
+ ObjectLock olock(rows);
+ BOOST_FOREACH(const Dictionary::Ptr& row, rows) {
+ DbType::Ptr dbtype = DbType::GetByID(row->Get("objecttype_id"));
- if (!dbtype)
- continue;
+ if (!dbtype)
+ continue;
- DbObject::Ptr dbobj = dbtype->GetOrCreateObjectByName(row->Get("name1"), row->Get("name2"));
- SetReference(dbobj, DbReference(row->Get("object_id")));
- }
+ DbObject::Ptr dbobj = dbtype->GetOrCreateObjectByName(row->Get("name1"), row->Get("name2"));
+ SetReference(dbobj, DbReference(row->Get("object_id")));
}
Query("BEGIN");
UpdateAllObjects();
}
-void MysqlDbConnection::Stop(void)
-{
- boost::mutex::scoped_lock lock(m_ConnectionMutex);
- mysql_close(&m_Connection);
-}
-
-void MysqlDbConnection::TxTimerHandler(void)
-{
- boost::mutex::scoped_lock lock(m_ConnectionMutex);
- Query("COMMIT");
- Query("BEGIN");
-}
-
Array::Ptr MysqlDbConnection::Query(const String& query)
{
Log(LogDebug, "ido_mysql", "Query: " + query);
void MysqlDbConnection::UpdateObject(const DbObject::Ptr& dbobj, DbUpdateType kind) {
boost::mutex::scoped_lock lock(m_ConnectionMutex);
+ /* Check if we can handle updates right now */
+ if (!m_Connected)
+ return;
+
DbReference dbref = GetReference(dbobj);
if (kind == DbObjectRemoved) {
Query(q1buf.str());
}
- //Dictionary::Ptr cols = boost::make_shared<Dictionary>();
-
Dictionary::Ptr fields = dbobj->GetFields();
if (!fields)