]> granicus.if.org Git - icinga2/blob - components/db_ido_mysql/idomysqlconnection.cpp
14e1d3eae41fcad4b4ca7c0b9bf64ccd3b128d64
[icinga2] / components / db_ido_mysql / idomysqlconnection.cpp
1 /******************************************************************************
2  * Icinga 2                                                                   *
3  * Copyright (C) 2012-2014 Icinga Development Team (http://www.icinga.org)    *
4  *                                                                            *
5  * This program is free software; you can redistribute it and/or              *
6  * modify it under the terms of the GNU General Public License                *
7  * as published by the Free Software Foundation; either version 2             *
8  * of the License, or (at your option) any later version.                     *
9  *                                                                            *
10  * This program is distributed in the hope that it will be useful,            *
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
13  * GNU General Public License for more details.                               *
14  *                                                                            *
15  * You should have received a copy of the GNU General Public License          *
16  * along with this program; if not, write to the Free Software Foundation     *
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
18  ******************************************************************************/
19
20 #include "base/logger_fwd.h"
21 #include "base/objectlock.h"
22 #include "base/convert.h"
23 #include "base/utility.h"
24 #include "base/application.h"
25 #include "base/dynamictype.h"
26 #include "base/exception.h"
27 #include "base/statsfunction.h"
28 #include "db_ido/dbtype.h"
29 #include "db_ido/dbvalue.h"
30 #include "db_ido_mysql/idomysqlconnection.h"
31 #include <boost/tuple/tuple.hpp>
32 #include <boost/foreach.hpp>
33
34 using namespace icinga;
35
36 #define SCHEMA_VERSION "1.11.0"
37
38 REGISTER_TYPE(IdoMysqlConnection);
39 REGISTER_STATSFUNCTION(IdoMysqlConnectionStats, &IdoMysqlConnection::StatsFunc);
40
41 Value IdoMysqlConnection::StatsFunc(Dictionary::Ptr& status, Dictionary::Ptr& perfdata)
42 {
43         Dictionary::Ptr nodes = make_shared<Dictionary>();
44
45         BOOST_FOREACH(const IdoMysqlConnection::Ptr& idomysqlconnection, DynamicType::GetObjects<IdoMysqlConnection>()) {
46                 size_t items = idomysqlconnection->m_QueryQueue.GetLength();
47
48                 Dictionary::Ptr stats = make_shared<Dictionary>();
49                 stats->Set("version", SCHEMA_VERSION);
50                 stats->Set("instance_name", idomysqlconnection->GetInstanceName());
51                 stats->Set("query_queue_items", items);
52
53                 nodes->Set(idomysqlconnection->GetName(), stats);
54
55                 perfdata->Set("idomysqlconnection_" + idomysqlconnection->GetName() + "_query_queue_items", Convert::ToDouble(items));
56         }
57
58         status->Set("idomysqlconnection", nodes);
59
60         return 0;
61 }
62
63 void IdoMysqlConnection::Start(void)
64 {
65         DbConnection::Start();
66
67         m_Connected = false;
68
69         m_QueryQueue.SetExceptionCallback(boost::bind(&IdoMysqlConnection::ExceptionHandler, this, _1));
70
71         m_TxTimer = make_shared<Timer>();
72         m_TxTimer->SetInterval(5);
73         m_TxTimer->OnTimerExpired.connect(boost::bind(&IdoMysqlConnection::TxTimerHandler, this));
74         m_TxTimer->Start();
75
76         m_ReconnectTimer = make_shared<Timer>();
77         m_ReconnectTimer->SetInterval(10);
78         m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&IdoMysqlConnection::ReconnectTimerHandler, this));
79         m_ReconnectTimer->Start();
80         m_ReconnectTimer->Reschedule(0);
81
82         ASSERT(mysql_thread_safe());
83 }
84
85 void IdoMysqlConnection::Stop(void)
86 {
87         m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::Disconnect, this));
88         m_QueryQueue.Join();
89 }
90
91 void IdoMysqlConnection::ExceptionHandler(boost::exception_ptr exp)
92 {
93         Log(LogCritical, "db_ido_mysql", "Exception during database operation: " + DiagnosticInformation(exp));
94
95         boost::mutex::scoped_lock lock(m_ConnectionMutex);
96
97         if (m_Connected) {
98                 mysql_close(&m_Connection);
99
100                 m_Connected = false;
101         }
102 }
103
104 void IdoMysqlConnection::AssertOnWorkQueue(void)
105 {
106         ASSERT(boost::this_thread::get_id() == m_QueryQueue.GetThreadId());
107 }
108
109 void IdoMysqlConnection::Disconnect(void)
110 {
111         AssertOnWorkQueue();
112
113         boost::mutex::scoped_lock lock(m_ConnectionMutex);
114
115         if (!m_Connected)
116                 return;
117
118         Query("COMMIT");
119         mysql_close(&m_Connection);
120
121         m_Connected = false;
122 }
123
124 void IdoMysqlConnection::TxTimerHandler(void)
125 {
126         m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::NewTransaction, this), true);
127 }
128
129 void IdoMysqlConnection::NewTransaction(void)
130 {
131         boost::mutex::scoped_lock lock(m_ConnectionMutex);
132
133         if (!m_Connected)
134                 return;
135
136         Query("COMMIT");
137         Query("BEGIN");
138 }
139
140 void IdoMysqlConnection::ReconnectTimerHandler(void)
141 {
142         m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::Reconnect, this));
143 }
144
145 void IdoMysqlConnection::Reconnect(void)
146 {
147         AssertOnWorkQueue();
148
149         CONTEXT("Reconnecting to MySQL IDO database '" + GetName() + "'");
150
151         std::vector<DbObject::Ptr> active_dbobjs;
152
153         {
154                 boost::mutex::scoped_lock lock(m_ConnectionMutex);
155
156                 bool reconnect = false;
157
158                 if (m_Connected) {
159                         /* Check if we're really still connected */
160                         if (mysql_ping(&m_Connection) == 0)
161                                 return;
162
163                         mysql_close(&m_Connection);
164                         m_Connected = false;
165                         reconnect = true;
166                 }
167
168                 ClearIDCache();
169
170                 String ihost, iuser, ipasswd, idb;
171                 const char *host, *user , *passwd, *db;
172                 long port;
173
174                 ihost = GetHost();
175                 iuser = GetUser();
176                 ipasswd = GetPassword();
177                 idb = GetDatabase();
178
179                 host = (!ihost.IsEmpty()) ? ihost.CStr() : NULL;
180                 port = GetPort();
181                 user = (!iuser.IsEmpty()) ? iuser.CStr() : NULL;
182                 passwd = (!ipasswd.IsEmpty()) ? ipasswd.CStr() : NULL;
183                 db = (!idb.IsEmpty()) ? idb.CStr() : NULL;
184
185                 if (!mysql_init(&m_Connection))
186                         BOOST_THROW_EXCEPTION(std::bad_alloc());
187
188                 if (!mysql_real_connect(&m_Connection, host, user, passwd, db, port, NULL, CLIENT_FOUND_ROWS))
189                         BOOST_THROW_EXCEPTION(std::runtime_error(mysql_error(&m_Connection)));
190
191                 m_Connected = true;
192
193                 String dbVersionName = "idoutils";
194                 IdoMysqlResult result = Query("SELECT version FROM " + GetTablePrefix() + "dbversion WHERE name='" + Escape(dbVersionName) + "'");
195
196                 Dictionary::Ptr version_row = FetchRow(result);
197
198                 if (!version_row)
199                         BOOST_THROW_EXCEPTION(std::runtime_error("Schema does not provide any valid version! Verify your schema installation."));
200
201                 DiscardRows(result);
202
203                 String version = version_row->Get("version");
204
205                 if (Utility::CompareVersion(SCHEMA_VERSION, version) < 0) {
206                         BOOST_THROW_EXCEPTION(std::runtime_error("Schema version '" + version + "' does not match the required version '" +
207                            SCHEMA_VERSION + "'! Please check the upgrade documentation."));
208                 }
209
210                 String instanceName = GetInstanceName();
211
212                 result = Query("SELECT instance_id FROM " + GetTablePrefix() + "instances WHERE instance_name = '" + Escape(instanceName) + "'");
213
214                 Dictionary::Ptr row = FetchRow(result);
215
216                 if (!row) {
217                         Query("INSERT INTO " + GetTablePrefix() + "instances (instance_name, instance_description) VALUES ('" + Escape(instanceName) + "', '" + Escape(GetInstanceDescription()) + "')");
218                         m_InstanceID = GetLastInsertID();
219                 } else {
220                         DiscardRows(result);
221
222                         m_InstanceID = DbReference(row->Get("instance_id"));
223                 }
224
225                 std::ostringstream msgbuf;
226                 msgbuf << "MySQL IDO instance id: " << static_cast<long>(m_InstanceID) << " (schema version: '" + version + "')";
227                 Log(LogInformation, "db_ido_mysql", msgbuf.str());
228
229                 /* set session time zone to utc */
230                 Query("SET SESSION TIME_ZONE='+00:00'");
231
232                 /* record connection */
233                 Query("INSERT INTO " + GetTablePrefix() + "conninfo " +
234                     "(instance_id, connect_time, last_checkin_time, agent_name, agent_version, connect_type, data_start_time) VALUES ("
235                     + Convert::ToString(static_cast<long>(m_InstanceID)) + ", NOW(), NOW(), 'icinga2 db_ido_mysql', '" + Escape(Application::GetVersion())
236                     + "', '" + (reconnect ? "RECONNECT" : "INITIAL") + "', NOW())");
237
238                 /* clear config tables for the initial config dump */
239                 PrepareDatabase();
240
241                 std::ostringstream q1buf;
242                 q1buf << "SELECT object_id, objecttype_id, name1, name2, is_active FROM " + GetTablePrefix() + "objects WHERE instance_id = " << static_cast<long>(m_InstanceID);
243                 result = Query(q1buf.str());
244
245                 while ((row = FetchRow(result))) {
246                         DbType::Ptr dbtype = DbType::GetByID(row->Get("objecttype_id"));
247
248                         if (!dbtype)
249                                 continue;
250
251                         DbObject::Ptr dbobj = dbtype->GetOrCreateObjectByName(row->Get("name1"), row->Get("name2"));
252                         SetObjectID(dbobj, DbReference(row->Get("object_id")));
253                         SetObjectActive(dbobj, row->Get("is_active"));
254
255                         if (GetObjectActive(dbobj))
256                                 active_dbobjs.push_back(dbobj);
257                 }
258
259                 Query("BEGIN");
260         }
261
262         UpdateAllObjects();
263
264         /* deactivate all deleted configuration objects */
265         BOOST_FOREACH(const DbObject::Ptr& dbobj, active_dbobjs) {
266                 if (dbobj->GetObject() == NULL) {
267                         Log(LogDebug, "db_ido", "Deactivate deleted object name1: '" + Convert::ToString(dbobj->GetName1() +
268                             "' name2: '" + Convert::ToString(dbobj->GetName2() + "'.")));
269                         DeactivateObject(dbobj);
270                 }
271         }
272 }
273
274 void IdoMysqlConnection::ClearConfigTable(const String& table)
275 {
276         Query("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " + Convert::ToString(static_cast<long>(m_InstanceID)));
277 }
278
279 IdoMysqlResult IdoMysqlConnection::Query(const String& query)
280 {
281         AssertOnWorkQueue();
282
283         Log(LogDebug, "db_ido_mysql", "Query: " + query);
284
285         if (mysql_query(&m_Connection, query.CStr()) != 0)
286                 BOOST_THROW_EXCEPTION(
287                     database_error()
288                         << errinfo_message(mysql_error(&m_Connection))
289                         << errinfo_database_query(query)
290                 );
291
292         m_AffectedRows = mysql_affected_rows(&m_Connection);
293
294         MYSQL_RES *result = mysql_use_result(&m_Connection);
295
296         if (!result) {
297                 if (mysql_field_count(&m_Connection) > 0)
298                         BOOST_THROW_EXCEPTION(
299                             database_error()
300                                 << errinfo_message(mysql_error(&m_Connection))
301                                 << errinfo_database_query(query)
302                         );
303
304                 return IdoMysqlResult();
305         }
306
307         return IdoMysqlResult(result, std::ptr_fun(mysql_free_result));
308 }
309
310 DbReference IdoMysqlConnection::GetLastInsertID(void)
311 {
312         AssertOnWorkQueue();
313
314         return DbReference(mysql_insert_id(&m_Connection));
315 }
316
317 int IdoMysqlConnection::GetAffectedRows(void)
318 {
319         AssertOnWorkQueue();
320
321         return m_AffectedRows;
322 }
323
324 String IdoMysqlConnection::Escape(const String& s)
325 {
326         AssertOnWorkQueue();
327
328         size_t length = s.GetLength();
329         char *to = new char[s.GetLength() * 2 + 1];
330
331         mysql_real_escape_string(&m_Connection, to, s.CStr(), length);
332
333         String result = String(to);
334
335         delete [] to;
336
337         return result;
338 }
339
340 Dictionary::Ptr IdoMysqlConnection::FetchRow(const IdoMysqlResult& result)
341 {
342         AssertOnWorkQueue();
343
344         MYSQL_ROW row;
345         MYSQL_FIELD *field;
346         unsigned long *lengths, i;
347
348         row = mysql_fetch_row(result.get());
349
350         if (!row)
351                 return Dictionary::Ptr();
352
353         lengths = mysql_fetch_lengths(result.get());
354
355         if (!lengths)
356                 return Dictionary::Ptr();
357
358         Dictionary::Ptr dict = make_shared<Dictionary>();
359
360         mysql_field_seek(result.get(), 0);
361         for (field = mysql_fetch_field(result.get()), i = 0; field; field = mysql_fetch_field(result.get()), i++)
362                 dict->Set(field->name, String(row[i], row[i] + lengths[i]));
363
364         return dict;
365 }
366
367 void IdoMysqlConnection::DiscardRows(const IdoMysqlResult& result)
368 {
369         Dictionary::Ptr row;
370
371         while ((row = FetchRow(result)))
372                 ; /* empty loop body */
373 }
374
375 void IdoMysqlConnection::ActivateObject(const DbObject::Ptr& dbobj)
376 {
377         boost::mutex::scoped_lock lock(m_ConnectionMutex);
378         InternalActivateObject(dbobj);
379 }
380
381 void IdoMysqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj)
382 {
383         if (!m_Connected)
384                 return;
385
386         DbReference dbref = GetObjectID(dbobj);
387         std::ostringstream qbuf;
388
389         if (!dbref.IsValid()) {
390                 qbuf << "INSERT INTO " + GetTablePrefix() + "objects (instance_id, objecttype_id, name1, name2, is_active) VALUES ("
391                       << static_cast<long>(m_InstanceID) << ", " << dbobj->GetType()->GetTypeID() << ", "
392                       << "'" << Escape(dbobj->GetName1()) << "', '" << Escape(dbobj->GetName2()) << "', 1)";
393                 Query(qbuf.str());
394                 SetObjectID(dbobj, GetLastInsertID());
395         } else {
396                 qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 1 WHERE object_id = " << static_cast<long>(dbref);
397                 Query(qbuf.str());
398         }
399 }
400
401 void IdoMysqlConnection::DeactivateObject(const DbObject::Ptr& dbobj)
402 {
403         boost::mutex::scoped_lock lock(m_ConnectionMutex);
404
405         if (!m_Connected)
406                 return;
407
408         DbReference dbref = GetObjectID(dbobj);
409
410         if (!dbref.IsValid())
411                 return;
412
413         std::ostringstream qbuf;
414         qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 0 WHERE object_id = " << static_cast<long>(dbref);
415         Query(qbuf.str());
416
417         /* Note that we're _NOT_ clearing the db refs via SetReference/SetConfigUpdate/SetStatusUpdate
418          * because the object is still in the database. */
419 }
420
421 /* caller must hold m_ConnectionMutex */
422 bool IdoMysqlConnection::FieldToEscapedString(const String& key, const Value& value, Value *result)
423 {
424         if (key == "instance_id") {
425                 *result = static_cast<long>(m_InstanceID);
426                 return true;
427         }
428         if (key == "notification_id") {
429                 *result = static_cast<long>(GetNotificationInsertID(value));
430                 return true;
431         }
432
433         Value rawvalue = DbValue::ExtractValue(value);
434
435         if (rawvalue.IsObjectType<DynamicObject>()) {
436                 DbObject::Ptr dbobjcol = DbObject::GetOrCreateByObject(rawvalue);
437
438                 if (!dbobjcol) {
439                         *result = 0;
440                         return true;
441                 }
442
443                 DbReference dbrefcol;
444
445                 if (DbValue::IsObjectInsertID(value)) {
446                         dbrefcol = GetInsertID(dbobjcol);
447
448                         ASSERT(dbrefcol.IsValid());
449                 } else {
450                         dbrefcol = GetObjectID(dbobjcol);
451
452                         if (!dbrefcol.IsValid()) {
453                                 InternalActivateObject(dbobjcol);
454
455                                 dbrefcol = GetObjectID(dbobjcol);
456
457                                 if (!dbrefcol.IsValid())
458                                         return false;
459                         }
460                 }
461
462                 *result = static_cast<long>(dbrefcol);
463         } else if (DbValue::IsTimestamp(value)) {
464                 long ts = rawvalue;
465                 std::ostringstream msgbuf;
466                 msgbuf << "FROM_UNIXTIME(" << ts << ")";
467                 *result = Value(msgbuf.str());
468         } else if (DbValue::IsTimestampNow(value)) {
469                 *result = "NOW()";
470         } else {
471                 *result = "'" + Escape(rawvalue) + "'";
472         }
473
474         return true;
475 }
476
477 void IdoMysqlConnection::ExecuteQuery(const DbQuery& query)
478 {
479         ASSERT(query.Category != DbCatInvalid);
480
481         m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), true);
482 }
483
484 void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType *typeOverride)
485 {
486         boost::mutex::scoped_lock lock(m_ConnectionMutex);
487
488         if ((query.Category & GetCategories()) == 0)
489                 return;
490
491         if (!m_Connected)
492                 return;
493
494         std::ostringstream qbuf, where;
495         int type;
496
497         if (query.WhereCriteria) {
498                 where << " WHERE ";
499
500                 ObjectLock olock(query.WhereCriteria);
501                 Value value;
502                 bool first = true;
503
504                 BOOST_FOREACH(const Dictionary::Pair& kv, query.WhereCriteria) {
505                         if (!FieldToEscapedString(kv.first, kv.second, &value))
506                                 return;
507
508                         if (!first)
509                                 where << " AND ";
510
511                         where << kv.first << " = " << value;
512
513                         if (first)
514                                 first = false;
515                 }
516         }
517
518         type = typeOverride ? *typeOverride : query.Type;
519
520         bool upsert = false;
521
522         if ((type & DbQueryInsert) && (type & DbQueryUpdate)) {
523                 bool hasid = false;
524
525                 ASSERT(query.Object);
526
527                 if (query.ConfigUpdate)
528                         hasid = GetConfigUpdate(query.Object);
529                 else if (query.StatusUpdate)
530                         hasid = GetStatusUpdate(query.Object);
531                 else
532                         ASSERT(!"Invalid query flags.");
533
534                 if (!hasid)
535                         upsert = true;
536
537                 type = DbQueryUpdate;
538         }
539
540         switch (type) {
541                 case DbQueryInsert:
542                         qbuf << "INSERT INTO " << GetTablePrefix() << query.Table;
543                         break;
544                 case DbQueryUpdate:
545                         qbuf << "UPDATE " << GetTablePrefix() << query.Table << " SET";
546                         break;
547                 case DbQueryDelete:
548                         qbuf << "DELETE FROM " << GetTablePrefix() << query.Table;
549                         break;
550                 default:
551                         ASSERT(!"Invalid query type.");
552         }
553
554         if (type == DbQueryInsert || type == DbQueryUpdate) {
555                 std::ostringstream colbuf, valbuf;
556
557                 ObjectLock olock(query.Fields);
558
559                 bool first = true;
560                 BOOST_FOREACH(const Dictionary::Pair& kv, query.Fields) {
561                         Value value;
562
563                         if (!FieldToEscapedString(kv.first, kv.second, &value))
564                                 return;
565
566                         if (type == DbQueryInsert) {
567                                 if (!first) {
568                                         colbuf << ", ";
569                                         valbuf << ", ";
570                                 }
571
572                                 colbuf << kv.first;
573                                 valbuf << value;
574                         } else {
575                                 if (!first)
576                                         qbuf << ", ";
577
578                                 qbuf << " " << kv.first << " = " << value;
579                         }
580
581                         if (first)
582                                 first = false;
583                 }
584
585                 if (type == DbQueryInsert)
586                         qbuf << " (" << colbuf.str() << ") VALUES (" << valbuf.str() << ")";
587         }
588
589         if (type != DbQueryInsert)
590                 qbuf << where.str();
591
592         Query(qbuf.str());
593
594         if (upsert && GetAffectedRows() == 0) {
595                 lock.unlock();
596
597                 DbQueryType to = DbQueryInsert;
598                 InternalExecuteQuery(query, &to);
599
600                 return;
601         }
602
603         if (query.Object) {
604                 if (query.ConfigUpdate)
605                         SetConfigUpdate(query.Object, true);
606                 else if (query.StatusUpdate)
607                         SetStatusUpdate(query.Object, true);
608
609                 if (type == DbQueryInsert && query.ConfigUpdate)
610                         SetInsertID(query.Object, GetLastInsertID());
611         }
612
613         if (type == DbQueryInsert && query.Table == "notifications" && query.NotificationObject) { // FIXME remove hardcoded table name
614                 SetNotificationInsertID(query.NotificationObject, GetLastInsertID());
615                 Log(LogDebug, "db_ido", "saving contactnotification notification_id=" + Convert::ToString(static_cast<long>(GetLastInsertID())));
616         }
617 }
618
619 void IdoMysqlConnection::CleanUpExecuteQuery(const String& table, const String& time_column, double max_age)
620 {
621         m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age), true);
622 }
623
624 void IdoMysqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_column, double max_age)
625 {
626         boost::mutex::scoped_lock lock(m_ConnectionMutex);
627
628         if (!m_Connected)
629                 return;
630
631         Query("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " +
632             Convert::ToString(static_cast<long>(m_InstanceID)) + " AND " + time_column +
633             " < FROM_UNIXTIME(" + Convert::ToString(static_cast<long>(max_age)) + ")");
634 }
635
636 void IdoMysqlConnection::FillIDCache(const DbType::Ptr& type)
637 {
638         String query = "SELECT " + type->GetIDColumn() + " AS object_id, " + type->GetTable() + "_id FROM " + GetTablePrefix() + type->GetTable() + "s";
639         IdoMysqlResult result = Query(query);
640
641         Dictionary::Ptr row;
642
643         while ((row = FetchRow(result))) {
644                 SetInsertID(type, DbReference(row->Get("object_id")), DbReference(row->Get(type->GetTable() + "_id")));
645         }
646 }