void IdoMysqlConnection::Resume(void)
{
- if (!IsPaused())
- return;
-
DbConnection::Resume();
m_Connected = false;
void IdoMysqlConnection::Pause(void)
{
- if (!GetEnableHa()) {
- Log(LogInformation, "IdoMysqlConnection", "HA functionality disabled. Won't pause IDO connection: " + GetName());
- return;
- }
-
m_ReconnectTimer.reset();
DbConnection::Pause();
passwd = (!ipasswd.IsEmpty()) ? ipasswd.CStr() : NULL;
db = (!idb.IsEmpty()) ? idb.CStr() : NULL;
+ /* connection */
if (!mysql_init(&m_Connection)) {
std::ostringstream msgbuf;
msgbuf << "mysql_init() failed: \"" << mysql_error(&m_Connection) << "\"";
String dbVersionName = "idoutils";
IdoMysqlResult result = Query("SELECT version FROM " + GetTablePrefix() + "dbversion WHERE name='" + Escape(dbVersionName) + "'");
- Dictionary::Ptr version_row = FetchRow(result);
+ Dictionary::Ptr row = FetchRow(result);
- if (!version_row) {
+ if (!row) {
Log(LogCritical, "IdoMysqlConnection", "Schema does not provide any valid version! Verify your schema installation.");
Application::Exit(EXIT_FAILURE);
DiscardRows(result);
- String version = version_row->Get("version");
+ String version = row->Get("version");
if (Utility::CompareVersion(SCHEMA_VERSION, version) < 0) {
Log(LogCritical, "IdoMysqlConnection", "Schema version '" + version + "' does not match the required version '" +
String instanceName = GetInstanceName();
result = Query("SELECT instance_id FROM " + GetTablePrefix() + "instances WHERE instance_name = '" + Escape(instanceName) + "'");
-
- Dictionary::Ptr row = FetchRow(result);
+ row = FetchRow(result);
if (!row) {
Query("INSERT INTO " + GetTablePrefix() + "instances (instance_name, instance_description) VALUES ('" + Escape(instanceName) + "', '" + Escape(GetInstanceDescription()) + "')");
m_InstanceID = GetLastInsertID();
} else {
+ m_InstanceID = DbReference(row->Get("instance_id"));
+ }
+
+ DiscardRows(result);
+
+ Endpoint::Ptr my_endpoint = Endpoint::GetLocalEndpoint();
+
+ /* we have an endpoint in a cluster setup, so decide if we can proceed here */
+ if (my_endpoint && GetHAMode() == HARunOnce) {
+ /* get the current endpoint writing to programstatus table */
+ result = Query("SELECT UNIX_TIMESTAMP(status_update_time) AS status_update_time, endpoint_name FROM " +
+ GetTablePrefix() + "programstatus WHERE instance_id = " + Convert::ToString(m_InstanceID));
+ row = FetchRow(result);
DiscardRows(result);
- m_InstanceID = DbReference(row->Get("instance_id"));
+ String endpoint_name;
+
+ if (row)
+ endpoint_name = row->Get("endpoint_name");
+ else
+ Log(LogNotice, "IdoMysqlConnection", "Empty program status table");
+
+ /* if we did not write into the database earlier, another instance is active */
+ if (endpoint_name != my_endpoint->GetName()) {
+ double status_update_time;
+
+ if (row)
+ status_update_time = row->Get("status_update_time");
+ else
+ status_update_time = 0;
+
+ double status_update_age = Utility::GetTime() - status_update_time;
+
+ Log(LogNotice, "IdoMysqlConnection", "Last update by '" +
+ endpoint_name + "' was " + Convert::ToString(status_update_age) + "s ago.");
+
+ if (status_update_age < 60) {
+ mysql_close(&m_Connection);
+ m_Connected = false;
+
+ return;
+ }
+
+ /* activate the IDO only, if we're authoritative in this zone */
+ if (IsPaused()) {
+ Log(LogNotice, "IdoMysqlConnection", "Local endpoint '" +
+ my_endpoint->GetName() + "' is not authoritative, bailing out.");
+
+ mysql_close(&m_Connection);
+ m_Connected = false;
+
+ return;
+ }
+ }
+
+ Log(LogNotice, "IdoMysqlConnection", "Enabling IDO connection.");
}
std::ostringstream msgbuf;
void IdoPgsqlConnection::Resume(void)
{
- if (!IsPaused())
- return;
-
DbConnection::Resume();
m_Connection = NULL;
void IdoPgsqlConnection::Pause(void)
{
- if (!GetEnableHa()) {
- Log(LogInformation, "IdoMysqlConnection", "HA functionality disabled. Won't pause IDO connection: " + GetName());
- return;
- }
-
m_ReconnectTimer.reset();
DbConnection::Pause();
String dbVersionName = "idoutils";
IdoPgsqlResult result = Query("SELECT version FROM " + GetTablePrefix() + "dbversion WHERE name=E'" + Escape(dbVersionName) + "'");
- Dictionary::Ptr version_row = FetchRow(result, 0);
+ Dictionary::Ptr row = FetchRow(result, 0);
- if (!version_row) {
+ if (!row) {
Log(LogCritical, "IdoPgsqlConnection", "Schema does not provide any valid version! Verify your schema installation.");
Application::Exit(EXIT_FAILURE);
}
- String version = version_row->Get("version");
+ String version = row->Get("version");
if (Utility::CompareVersion(SCHEMA_VERSION, version) < 0) {
Log(LogCritical, "IdoPgsqlConnection", "Schema version '" + version + "' does not match the required version '" +
String instanceName = GetInstanceName();
result = Query("SELECT instance_id FROM " + GetTablePrefix() + "instances WHERE instance_name = E'" + Escape(instanceName) + "'");
-
- Dictionary::Ptr row = FetchRow(result, 0);
+ row = FetchRow(result, 0);
if (!row) {
Query("INSERT INTO " + GetTablePrefix() + "instances (instance_name, instance_description) VALUES (E'" + Escape(instanceName) + "', E'" + Escape(GetInstanceDescription()) + "')");
m_InstanceID = DbReference(row->Get("instance_id"));
}
+ Endpoint::Ptr my_endpoint = Endpoint::GetLocalEndpoint();
+
+ /* we have an endpoint in a cluster setup, so decide if we can proceed here */
+ if (my_endpoint && GetHAMode() == HARunOnce) {
+ /* get the current endpoint writing to programstatus table */
+ result = Query("SELECT UNIX_TIMESTAMP(status_update_time) AS status_update_time, endpoint_name FROM " +
+ GetTablePrefix() + "programstatus WHERE instance_id = " + Convert::ToString(m_InstanceID));
+ row = FetchRow(result, 0);
+
+ String endpoint_name;
+
+ if (row)
+ endpoint_name = row->Get("endpoint_name");
+ else
+ Log(LogNotice, "IdoPgsqlConnection", "Empty program status table");
+
+ /* if we did not write into the database earlier, another instance is active */
+ if (endpoint_name != my_endpoint->GetName()) {
+ double status_update_time;
+
+ if (row)
+ status_update_time = row->Get("status_update_time");
+ else
+ status_update_time = 0;
+
+ double status_update_age = Utility::GetTime() - status_update_time;
+
+ Log(LogNotice, "IdoPgsqlConnection", "Last update by '" +
+ endpoint_name + "' was " + Convert::ToString(status_update_age) + "s ago.");
+
+ if (status_update_age < 60) {
+ PQfinish(m_Connection);
+ m_Connection = NULL;
+
+ return;
+ }
+
+ /* activate the IDO only, if we're authoritative in this zone */
+ if (IsPaused()) {
+ Log(LogNotice, "IdoPgsqlConnection", "Local endpoint '" +
+ my_endpoint->GetName() + "' is not authoritative, bailing out.");
+
+ PQfinish(m_Connection);
+ m_Connection = NULL;
+
+ return;
+ }
+ }
+
+ Log(LogNotice, "IdoPgsqlConnection", "Enabling IDO connection.");
+ }
+
std::ostringstream msgbuf;
msgbuf << "pgSQL IDO instance id: " << static_cast<long>(m_InstanceID) << " (schema version: '" + version + "')";
Log(LogInformation, "IdoPgsqlConnection", msgbuf.str());
{
code {{{
+enum HAMode
+{
+ HARunOnce,
+ HARunEverywhere
+};
+
class NameComposer {
public:
virtual String MakeName(const String& shortName, const Dictionary::Ptr props) const = 0;
[get_protected] bool stop_called;
[get_protected] bool pause_called;
[get_protected] bool resume_called;
+ [enum] HAMode ha_mode (HAMode);
Dictionary::Ptr authority_info;
[protected] Dictionary::Ptr extensions;
INITIALIZE_ONCE(&DbConnection::StaticInitialize);
+void DbConnection::OnConfigLoaded(void)
+{
+ DynamicObject::OnConfigLoaded();
+
+ if (!GetEnableHa()) {
+ Log(LogDebug, "DbConnection", "HA functionality disabled. Won't pause IDO connection: " + GetName());
+ SetHAMode(HARunEverywhere);
+ }
+}
+
void DbConnection::Start(void)
{
DynamicObject::Start();
//ClearConfigTable("hostgroups");
//ClearConfigTable("hosts");
//ClearConfigTable("hoststatus");
- ClearConfigTable("programstatus");
ClearConfigTable("scheduleddowntime");
ClearConfigTable("service_contactgroups");
ClearConfigTable("service_contacts");
bool GetStatusUpdate(const DbObject::Ptr& dbobj) const;
protected:
+ virtual void OnConfigLoaded(void);
virtual void Start(void);
virtual void Resume(void);
virtual void Pause(void);
BOOST_FOREACH(const DynamicObject::Ptr& object, type->GetObjects()) {
Endpoint::Ptr endpoint = endpoints[Utility::SDBM(object->GetName()) % endpoints.size()];
- object->SetAuthority(endpoint == my_endpoint);
+ if (object->GetHAMode() == HARunOnce)
+ object->SetAuthority(endpoint == my_endpoint);
}
}
}