]> granicus.if.org Git - icinga2/blob - lib/db_ido_mysql/idomysqlconnection.cpp
add some object locking to the Dump method (which could theoreticylly suffer from...
[icinga2] / lib / db_ido_mysql / idomysqlconnection.cpp
1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
2
3 #include "db_ido_mysql/idomysqlconnection.hpp"
4 #include "db_ido_mysql/idomysqlconnection-ti.cpp"
5 #include "db_ido/dbtype.hpp"
6 #include "db_ido/dbvalue.hpp"
7 #include "base/logger.hpp"
8 #include "base/objectlock.hpp"
9 #include "base/convert.hpp"
10 #include "base/utility.hpp"
11 #include "base/perfdatavalue.hpp"
12 #include "base/application.hpp"
13 #include "base/configtype.hpp"
14 #include "base/exception.hpp"
15 #include "base/statsfunction.hpp"
16 #include <boost/tuple/tuple.hpp>
17 #include <utility>
18
19 using namespace icinga;
20
21 REGISTER_TYPE(IdoMysqlConnection);
22 REGISTER_STATSFUNCTION(IdoMysqlConnection, &IdoMysqlConnection::StatsFunc);
23
24 void IdoMysqlConnection::OnConfigLoaded()
25 {
26         ObjectImpl<IdoMysqlConnection>::OnConfigLoaded();
27
28         m_QueryQueue.SetName("IdoMysqlConnection, " + GetName());
29
30         Library shimLibrary{"mysql_shim"};
31
32         auto create_mysql_shim = shimLibrary.GetSymbolAddress<create_mysql_shim_ptr>("create_mysql_shim");
33
34         m_Mysql.reset(create_mysql_shim());
35
36         std::swap(m_Library, shimLibrary);
37 }
38
39 void IdoMysqlConnection::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
40 {
41         DictionaryData nodes;
42
43         for (const IdoMysqlConnection::Ptr& idomysqlconnection : ConfigType::GetObjectsByType<IdoMysqlConnection>()) {
44                 size_t queryQueueItems = idomysqlconnection->m_QueryQueue.GetLength();
45                 double queryQueueItemRate = idomysqlconnection->m_QueryQueue.GetTaskCount(60) / 60.0;
46
47                 nodes.emplace_back(idomysqlconnection->GetName(), new Dictionary({
48                         { "version", idomysqlconnection->GetSchemaVersion() },
49                         { "instance_name", idomysqlconnection->GetInstanceName() },
50                         { "connected", idomysqlconnection->GetConnected() },
51                         { "query_queue_items", queryQueueItems },
52                         { "query_queue_item_rate", queryQueueItemRate }
53                 }));
54
55                 perfdata->Add(new PerfdataValue("idomysqlconnection_" + idomysqlconnection->GetName() + "_queries_rate", idomysqlconnection->GetQueryCount(60) / 60.0));
56                 perfdata->Add(new PerfdataValue("idomysqlconnection_" + idomysqlconnection->GetName() + "_queries_1min", idomysqlconnection->GetQueryCount(60)));
57                 perfdata->Add(new PerfdataValue("idomysqlconnection_" + idomysqlconnection->GetName() + "_queries_5mins", idomysqlconnection->GetQueryCount(5 * 60)));
58                 perfdata->Add(new PerfdataValue("idomysqlconnection_" + idomysqlconnection->GetName() + "_queries_15mins", idomysqlconnection->GetQueryCount(15 * 60)));
59                 perfdata->Add(new PerfdataValue("idomysqlconnection_" + idomysqlconnection->GetName() + "_query_queue_items", queryQueueItems));
60                 perfdata->Add(new PerfdataValue("idomysqlconnection_" + idomysqlconnection->GetName() + "_query_queue_item_rate", queryQueueItemRate));
61         }
62
63         status->Set("idomysqlconnection", new Dictionary(std::move(nodes)));
64 }
65
66 void IdoMysqlConnection::Resume()
67 {
68         Log(LogInformation, "IdoMysqlConnection")
69                 << "'" << GetName() << "' resumed.";
70
71         SetConnected(false);
72
73         m_QueryQueue.SetExceptionCallback(std::bind(&IdoMysqlConnection::ExceptionHandler, this, _1));
74
75         /* Immediately try to connect on Resume() without timer. */
76         m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::Reconnect, this), PriorityImmediate);
77
78         m_TxTimer = new Timer();
79         m_TxTimer->SetInterval(1);
80         m_TxTimer->OnTimerExpired.connect(std::bind(&IdoMysqlConnection::TxTimerHandler, this));
81         m_TxTimer->Start();
82
83         m_ReconnectTimer = new Timer();
84         m_ReconnectTimer->SetInterval(10);
85         m_ReconnectTimer->OnTimerExpired.connect(std::bind(&IdoMysqlConnection::ReconnectTimerHandler, this));
86         m_ReconnectTimer->Start();
87
88         /* Start with queries after connect. */
89         DbConnection::Resume();
90
91         ASSERT(m_Mysql->thread_safe());
92 }
93
94 void IdoMysqlConnection::Pause()
95 {
96         Log(LogDebug, "IdoMysqlConnection")
97                 << "Attempting to pause '" << GetName() << "'.";
98
99         DbConnection::Pause();
100
101         m_ReconnectTimer.reset();
102
103 #ifdef I2_DEBUG /* I2_DEBUG */
104         Log(LogDebug, "IdoMysqlConnection")
105                 << "Rescheduling disconnect task.";
106 #endif /* I2_DEBUG */
107
108         m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::Disconnect, this), PriorityLow);
109
110         /* Work on remaining tasks but never delete the threads, for HA resuming later. */
111         m_QueryQueue.Join();
112
113         Log(LogInformation, "IdoMysqlConnection")
114                 << "'" << GetName() << "' paused.";
115
116 }
117
118 void IdoMysqlConnection::ExceptionHandler(boost::exception_ptr exp)
119 {
120         Log(LogCritical, "IdoMysqlConnection", "Exception during database operation: Verify that your database is operational!");
121
122         Log(LogDebug, "IdoMysqlConnection")
123                 << "Exception during database operation: " << DiagnosticInformation(std::move(exp));
124
125         if (GetConnected()) {
126                 m_Mysql->close(&m_Connection);
127
128                 SetConnected(false);
129         }
130 }
131
132 void IdoMysqlConnection::AssertOnWorkQueue()
133 {
134         ASSERT(m_QueryQueue.IsWorkerThread());
135 }
136
137 void IdoMysqlConnection::Disconnect()
138 {
139         AssertOnWorkQueue();
140
141         if (!GetConnected())
142                 return;
143
144         Query("COMMIT");
145         m_Mysql->close(&m_Connection);
146
147         SetConnected(false);
148
149         Log(LogInformation, "IdoMysqlConnection")
150                 << "Disconnected from '" << GetName() << "' database '" << GetDatabase() << "'.";
151 }
152
153 void IdoMysqlConnection::TxTimerHandler()
154 {
155         NewTransaction();
156 }
157
158 void IdoMysqlConnection::NewTransaction()
159 {
160         if (IsPaused())
161                 return;
162
163 #ifdef I2_DEBUG /* I2_DEBUG */
164         Log(LogDebug, "IdoMysqlConnection")
165                 << "Scheduling new transaction and finishing async queries.";
166 #endif /* I2_DEBUG */
167
168         m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalNewTransaction, this), PriorityNormal);
169         m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::FinishAsyncQueries, this), PriorityNormal);
170 }
171
172 void IdoMysqlConnection::InternalNewTransaction()
173 {
174         AssertOnWorkQueue();
175
176         if (!GetConnected())
177                 return;
178
179         AsyncQuery("COMMIT");
180         AsyncQuery("BEGIN");
181 }
182
183 void IdoMysqlConnection::ReconnectTimerHandler()
184 {
185 #ifdef I2_DEBUG /* I2_DEBUG */
186         Log(LogDebug, "IdoMysqlConnection")
187                 << "Scheduling reconnect task.";
188 #endif /* I2_DEBUG */
189
190         /* Only allow Reconnect events with high priority. */
191         m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::Reconnect, this), PriorityImmediate);
192 }
193
194 void IdoMysqlConnection::Reconnect()
195 {
196         AssertOnWorkQueue();
197
198         if (!IsActive())
199                 return;
200
201         CONTEXT("Reconnecting to MySQL IDO database '" + GetName() + "'");
202
203         double startTime = Utility::GetTime();
204
205         SetShouldConnect(true);
206
207         bool reconnect = false;
208
209         /* Ensure to close old connections first. */
210         if (GetConnected()) {
211                 /* Check if we're really still connected */
212                 if (m_Mysql->ping(&m_Connection) == 0)
213                         return;
214
215                 m_Mysql->close(&m_Connection);
216                 SetConnected(false);
217                 reconnect = true;
218         }
219
220         Log(LogDebug, "IdoMysqlConnection")
221                 << "Reconnect: Clearing ID cache.";
222
223         ClearIDCache();
224
225         String ihost, isocket_path, iuser, ipasswd, idb;
226         String isslKey, isslCert, isslCa, isslCaPath, isslCipher;
227         const char *host, *socket_path, *user , *passwd, *db;
228         const char *sslKey, *sslCert, *sslCa, *sslCaPath, *sslCipher;
229         bool enableSsl;
230         long port;
231
232         ihost = GetHost();
233         isocket_path = GetSocketPath();
234         iuser = GetUser();
235         ipasswd = GetPassword();
236         idb = GetDatabase();
237
238         enableSsl = GetEnableSsl();
239         isslKey = GetSslKey();
240         isslCert = GetSslCert();
241         isslCa = GetSslCa();
242         isslCaPath = GetSslCapath();
243         isslCipher = GetSslCipher();
244
245         host = (!ihost.IsEmpty()) ? ihost.CStr() : nullptr;
246         port = GetPort();
247         socket_path = (!isocket_path.IsEmpty()) ? isocket_path.CStr() : nullptr;
248         user = (!iuser.IsEmpty()) ? iuser.CStr() : nullptr;
249         passwd = (!ipasswd.IsEmpty()) ? ipasswd.CStr() : nullptr;
250         db = (!idb.IsEmpty()) ? idb.CStr() : nullptr;
251
252         sslKey = (!isslKey.IsEmpty()) ? isslKey.CStr() : nullptr;
253         sslCert = (!isslCert.IsEmpty()) ? isslCert.CStr() : nullptr;
254         sslCa = (!isslCa.IsEmpty()) ? isslCa.CStr() : nullptr;
255         sslCaPath = (!isslCaPath.IsEmpty()) ? isslCaPath.CStr() : nullptr;
256         sslCipher = (!isslCipher.IsEmpty()) ? isslCipher.CStr() : nullptr;
257
258         /* connection */
259         if (!m_Mysql->init(&m_Connection)) {
260                 Log(LogCritical, "IdoMysqlConnection")
261                         << "mysql_init() failed: out of memory";
262
263                 BOOST_THROW_EXCEPTION(std::bad_alloc());
264         }
265
266         if (enableSsl)
267                 m_Mysql->ssl_set(&m_Connection, sslKey, sslCert, sslCa, sslCaPath, sslCipher);
268
269         if (!m_Mysql->real_connect(&m_Connection, host, user, passwd, db, port, socket_path, CLIENT_FOUND_ROWS | CLIENT_MULTI_STATEMENTS)) {
270                 Log(LogCritical, "IdoMysqlConnection")
271                         << "Connection to database '" << db << "' with user '" << user << "' on '" << host << ":" << port
272                         << "' " << (enableSsl ? "(SSL enabled) " : "") << "failed: \"" << m_Mysql->error(&m_Connection) << "\"";
273
274                 BOOST_THROW_EXCEPTION(std::runtime_error(m_Mysql->error(&m_Connection)));
275         }
276
277         Log(LogNotice, "IdoMysqlConnection")
278                 << "Reconnect: '" << GetName() << "' is now connected to database '" << GetDatabase() << "'.";
279
280         SetConnected(true);
281
282         IdoMysqlResult result = Query("SELECT @@global.max_allowed_packet AS max_allowed_packet");
283
284         Dictionary::Ptr row = FetchRow(result);
285
286         if (row)
287                 m_MaxPacketSize = row->Get("max_allowed_packet");
288         else
289                 m_MaxPacketSize = 64 * 1024;
290
291         DiscardRows(result);
292
293         String dbVersionName = "idoutils";
294         result = Query("SELECT version FROM " + GetTablePrefix() + "dbversion WHERE name='" + Escape(dbVersionName) + "'");
295
296         row = FetchRow(result);
297
298         if (!row) {
299                 m_Mysql->close(&m_Connection);
300                 SetConnected(false);
301
302                 Log(LogCritical, "IdoMysqlConnection", "Schema does not provide any valid version! Verify your schema installation.");
303
304                 BOOST_THROW_EXCEPTION(std::runtime_error("Invalid schema."));
305         }
306
307         DiscardRows(result);
308
309         String version = row->Get("version");
310
311         SetSchemaVersion(version);
312
313         if (Utility::CompareVersion(IDO_COMPAT_SCHEMA_VERSION, version) < 0) {
314                 m_Mysql->close(&m_Connection);
315                 SetConnected(false);
316
317                 Log(LogCritical, "IdoMysqlConnection")
318                         << "Schema version '" << version << "' does not match the required version '"
319                         << IDO_COMPAT_SCHEMA_VERSION << "' (or newer)! Please check the upgrade documentation at "
320                         << "https://docs.icinga.com/icinga2/latest/doc/module/icinga2/chapter/upgrading-icinga-2#upgrading-mysql-db";
321
322                 BOOST_THROW_EXCEPTION(std::runtime_error("Schema version mismatch."));
323         }
324
325         String instanceName = GetInstanceName();
326
327         result = Query("SELECT instance_id FROM " + GetTablePrefix() + "instances WHERE instance_name = '" + Escape(instanceName) + "'");
328         row = FetchRow(result);
329
330         if (!row) {
331                 Query("INSERT INTO " + GetTablePrefix() + "instances (instance_name, instance_description) VALUES ('" + Escape(instanceName) + "', '" + Escape(GetInstanceDescription()) + "')");
332                 m_InstanceID = GetLastInsertID();
333         } else {
334                 m_InstanceID = DbReference(row->Get("instance_id"));
335         }
336
337         DiscardRows(result);
338
339         Endpoint::Ptr my_endpoint = Endpoint::GetLocalEndpoint();
340
341         /* we have an endpoint in a cluster setup, so decide if we can proceed here */
342         if (my_endpoint && GetHAMode() == HARunOnce) {
343                 /* get the current endpoint writing to programstatus table */
344                 result = Query("SELECT UNIX_TIMESTAMP(status_update_time) AS status_update_time, endpoint_name FROM " +
345                         GetTablePrefix() + "programstatus WHERE instance_id = " + Convert::ToString(m_InstanceID));
346                 row = FetchRow(result);
347                 DiscardRows(result);
348
349                 String endpoint_name;
350
351                 if (row)
352                         endpoint_name = row->Get("endpoint_name");
353                 else
354                         Log(LogNotice, "IdoMysqlConnection", "Empty program status table");
355
356                 /* if we did not write into the database earlier, another instance is active */
357                 if (endpoint_name != my_endpoint->GetName()) {
358                         double status_update_time;
359
360                         if (row)
361                                 status_update_time = row->Get("status_update_time");
362                         else
363                                 status_update_time = 0;
364
365                         double now = Utility::GetTime();
366
367                         double status_update_age = now - status_update_time;
368                         double failoverTimeout = GetFailoverTimeout();
369
370                         if (status_update_age < failoverTimeout) {
371                                 Log(LogInformation, "IdoMysqlConnection")
372                                         << "Last update by endpoint '" << endpoint_name << "' was "
373                                         << status_update_age << "s ago (< failover timeout of " << failoverTimeout << "s). Retrying.";
374
375                                 m_Mysql->close(&m_Connection);
376                                 SetConnected(false);
377                                 SetShouldConnect(false);
378
379                                 return;
380                         }
381
382                         /* activate the IDO only, if we're authoritative in this zone */
383                         if (IsPaused()) {
384                                 Log(LogNotice, "IdoMysqlConnection")
385                                         << "Local endpoint '" << my_endpoint->GetName() << "' is not authoritative, bailing out.";
386
387                                 m_Mysql->close(&m_Connection);
388                                 SetConnected(false);
389
390                                 return;
391                         }
392
393                         SetLastFailover(now);
394
395                         Log(LogInformation, "IdoMysqlConnection")
396                                 << "Last update by endpoint '" << endpoint_name << "' was "
397                                 << status_update_age << "s ago. Taking over '" << GetName() << "' in HA zone '" << Zone::GetLocalZone()->GetName() << "'.";
398                 }
399
400                 Log(LogNotice, "IdoMysqlConnection", "Enabling IDO connection in HA zone.");
401         }
402
403         Log(LogInformation, "IdoMysqlConnection")
404                 << "MySQL IDO instance id: " << static_cast<long>(m_InstanceID) << " (schema version: '" + version + "')";
405
406         /* set session time zone to utc */
407         Query("SET SESSION TIME_ZONE='+00:00'");
408
409         Query("SET SESSION SQL_MODE='NO_AUTO_VALUE_ON_ZERO'");
410
411         Query("BEGIN");
412
413         /* update programstatus table */
414         UpdateProgramStatus();
415
416         /* record connection */
417         Query("INSERT INTO " + GetTablePrefix() + "conninfo " +
418                 "(instance_id, connect_time, last_checkin_time, agent_name, agent_version, connect_type, data_start_time) VALUES ("
419                 + Convert::ToString(static_cast<long>(m_InstanceID)) + ", NOW(), NOW(), 'icinga2 db_ido_mysql', '" + Escape(Application::GetAppVersion())
420                 + "', '" + (reconnect ? "RECONNECT" : "INITIAL") + "', NOW())");
421
422         /* clear config tables for the initial config dump */
423         PrepareDatabase();
424
425         std::ostringstream q1buf;
426         q1buf << "SELECT object_id, objecttype_id, name1, name2, is_active FROM " + GetTablePrefix() + "objects WHERE instance_id = " << static_cast<long>(m_InstanceID);
427         result = Query(q1buf.str());
428
429         std::vector<DbObject::Ptr> activeDbObjs;
430
431         while ((row = FetchRow(result))) {
432                 DbType::Ptr dbtype = DbType::GetByID(row->Get("objecttype_id"));
433
434                 if (!dbtype)
435                         continue;
436
437                 DbObject::Ptr dbobj = dbtype->GetOrCreateObjectByName(row->Get("name1"), row->Get("name2"));
438                 SetObjectID(dbobj, DbReference(row->Get("object_id")));
439                 bool active = row->Get("is_active");
440                 SetObjectActive(dbobj, active);
441
442                 if (active)
443                         activeDbObjs.emplace_back(std::move(dbobj));
444         }
445
446         SetIDCacheValid(true);
447
448         EnableActiveChangedHandler();
449
450         for (const DbObject::Ptr& dbobj : activeDbObjs) {
451                 if (dbobj->GetObject())
452                         continue;
453
454                 Log(LogNotice, "IdoMysqlConnection")
455                         << "Deactivate deleted object name1: '" << dbobj->GetName1()
456                         << "' name2: '" << dbobj->GetName2() + "'.";
457                 DeactivateObject(dbobj);
458         }
459
460         UpdateAllObjects();
461
462 #ifdef I2_DEBUG /* I2_DEBUG */
463         Log(LogDebug, "IdoMysqlConnection")
464                 << "Scheduling session table clear and finish connect task.";
465 #endif /* I2_DEBUG */
466
467         m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::ClearTablesBySession, this), PriorityNormal);
468
469         m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::FinishConnect, this, startTime), PriorityNormal);
470 }
471
472 void IdoMysqlConnection::FinishConnect(double startTime)
473 {
474         AssertOnWorkQueue();
475
476         if (!GetConnected())
477                 return;
478
479         FinishAsyncQueries();
480
481         Log(LogInformation, "IdoMysqlConnection")
482                 << "Finished reconnecting to '" << GetName() << "' database '" << GetDatabase() << "' in "
483                 << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
484
485         Query("COMMIT");
486         Query("BEGIN");
487 }
488
489 void IdoMysqlConnection::ClearTablesBySession()
490 {
491         /* delete all comments and downtimes without current session token */
492         ClearTableBySession("comments");
493         ClearTableBySession("scheduleddowntime");
494 }
495
496 void IdoMysqlConnection::ClearTableBySession(const String& table)
497 {
498         Query("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " +
499                 Convert::ToString(static_cast<long>(m_InstanceID)) + " AND session_token <> " +
500                 Convert::ToString(GetSessionToken()));
501 }
502
503 void IdoMysqlConnection::AsyncQuery(const String& query, const std::function<void (const IdoMysqlResult&)>& callback)
504 {
505         AssertOnWorkQueue();
506
507         IdoAsyncQuery aq;
508         aq.Query = query;
509         /* XXX: Important: The callback must not immediately execute a query, but enqueue it!
510          * See https://github.com/Icinga/icinga2/issues/4603 for details.
511          */
512         aq.Callback = callback;
513         m_AsyncQueries.emplace_back(std::move(aq));
514
515         if (m_AsyncQueries.size() > 25000) {
516                 FinishAsyncQueries();
517                 InternalNewTransaction();
518         }
519 }
520
521 void IdoMysqlConnection::FinishAsyncQueries()
522 {
523         std::vector<IdoAsyncQuery> queries;
524         m_AsyncQueries.swap(queries);
525
526         std::vector<IdoAsyncQuery>::size_type offset = 0;
527
528         while (offset < queries.size()) {
529                 std::ostringstream querybuf;
530
531                 std::vector<IdoAsyncQuery>::size_type count = 0;
532                 size_t num_bytes = 0;
533
534                 for (std::vector<IdoAsyncQuery>::size_type i = offset; i < queries.size(); i++) {
535                         const IdoAsyncQuery& aq = queries[i];
536
537                         size_t size_query = aq.Query.GetLength() + 1;
538
539                         if (count > 0) {
540                                 if (num_bytes + size_query > m_MaxPacketSize - 512)
541                                         break;
542
543                                 querybuf << ";";
544                         }
545
546                         IncreaseQueryCount();
547                         count++;
548
549                         Log(LogDebug, "IdoMysqlConnection")
550                                 << "Query: " << aq.Query;
551
552                         querybuf << aq.Query;
553                         num_bytes += size_query;
554                 }
555
556                 String query = querybuf.str();
557
558                 if (m_Mysql->query(&m_Connection, query.CStr()) != 0) {
559                         std::ostringstream msgbuf;
560                         String message = m_Mysql->error(&m_Connection);
561                         msgbuf << "Error \"" << message << "\" when executing query \"" << query << "\"";
562                         Log(LogCritical, "IdoMysqlConnection", msgbuf.str());
563
564                         BOOST_THROW_EXCEPTION(
565                                 database_error()
566                                 << errinfo_message(m_Mysql->error(&m_Connection))
567                                 << errinfo_database_query(query)
568                         );
569                 }
570
571                 for (std::vector<IdoAsyncQuery>::size_type i = offset; i < offset + count; i++) {
572                         const IdoAsyncQuery& aq = queries[i];
573
574                         MYSQL_RES *result = m_Mysql->store_result(&m_Connection);
575
576                         m_AffectedRows = m_Mysql->affected_rows(&m_Connection);
577
578                         IdoMysqlResult iresult;
579
580                         if (!result) {
581                                 if (m_Mysql->field_count(&m_Connection) > 0) {
582                                         std::ostringstream msgbuf;
583                                         String message = m_Mysql->error(&m_Connection);
584                                         msgbuf << "Error \"" << message << "\" when executing query \"" << aq.Query << "\"";
585                                         Log(LogCritical, "IdoMysqlConnection", msgbuf.str());
586
587                                         BOOST_THROW_EXCEPTION(
588                                                 database_error()
589                                                 << errinfo_message(m_Mysql->error(&m_Connection))
590                                                 << errinfo_database_query(query)
591                                         );
592                                 }
593                         } else
594                                 iresult = IdoMysqlResult(result, std::bind(&MysqlInterface::free_result, std::cref(m_Mysql), _1));
595
596                         if (aq.Callback)
597                                 aq.Callback(iresult);
598
599                         if (m_Mysql->next_result(&m_Connection) > 0) {
600                                 std::ostringstream msgbuf;
601                                 String message = m_Mysql->error(&m_Connection);
602                                 msgbuf << "Error \"" << message << "\" when executing query \"" << query << "\"";
603                                 Log(LogCritical, "IdoMysqlConnection", msgbuf.str());
604
605                                 BOOST_THROW_EXCEPTION(
606                                         database_error()
607                                         << errinfo_message(m_Mysql->error(&m_Connection))
608                                         << errinfo_database_query(query)
609                                 );
610                         }
611                 }
612
613                 offset += count;
614         }
615 }
616
617 IdoMysqlResult IdoMysqlConnection::Query(const String& query)
618 {
619         AssertOnWorkQueue();
620
621         /* finish all async queries to maintain the right order for queries */
622         FinishAsyncQueries();
623
624         Log(LogDebug, "IdoMysqlConnection")
625                 << "Query: " << query;
626
627         IncreaseQueryCount();
628
629         if (m_Mysql->query(&m_Connection, query.CStr()) != 0) {
630                 std::ostringstream msgbuf;
631                 String message = m_Mysql->error(&m_Connection);
632                 msgbuf << "Error \"" << message << "\" when executing query \"" << query << "\"";
633                 Log(LogCritical, "IdoMysqlConnection", msgbuf.str());
634
635                 BOOST_THROW_EXCEPTION(
636                         database_error()
637                         << errinfo_message(m_Mysql->error(&m_Connection))
638                         << errinfo_database_query(query)
639                 );
640         }
641
642         MYSQL_RES *result = m_Mysql->store_result(&m_Connection);
643
644         m_AffectedRows = m_Mysql->affected_rows(&m_Connection);
645
646         if (!result) {
647                 if (m_Mysql->field_count(&m_Connection) > 0) {
648                         std::ostringstream msgbuf;
649                         String message = m_Mysql->error(&m_Connection);
650                         msgbuf << "Error \"" << message << "\" when executing query \"" << query << "\"";
651                         Log(LogCritical, "IdoMysqlConnection", msgbuf.str());
652
653                         BOOST_THROW_EXCEPTION(
654                                 database_error()
655                                 << errinfo_message(m_Mysql->error(&m_Connection))
656                                 << errinfo_database_query(query)
657                         );
658                 }
659
660                 return IdoMysqlResult();
661         }
662
663         return IdoMysqlResult(result, std::bind(&MysqlInterface::free_result, std::cref(m_Mysql), _1));
664 }
665
666 DbReference IdoMysqlConnection::GetLastInsertID()
667 {
668         AssertOnWorkQueue();
669
670         return {static_cast<long>(m_Mysql->insert_id(&m_Connection))};
671 }
672
673 int IdoMysqlConnection::GetAffectedRows()
674 {
675         AssertOnWorkQueue();
676
677         return m_AffectedRows;
678 }
679
680 String IdoMysqlConnection::Escape(const String& s)
681 {
682         AssertOnWorkQueue();
683
684         String utf8s = Utility::ValidateUTF8(s);
685
686         size_t length = utf8s.GetLength();
687         auto *to = new char[utf8s.GetLength() * 2 + 1];
688
689         m_Mysql->real_escape_string(&m_Connection, to, utf8s.CStr(), length);
690
691         String result = String(to);
692
693         delete [] to;
694
695         return result;
696 }
697
698 Dictionary::Ptr IdoMysqlConnection::FetchRow(const IdoMysqlResult& result)
699 {
700         AssertOnWorkQueue();
701
702         MYSQL_ROW row;
703         MYSQL_FIELD *field;
704         unsigned long *lengths, i;
705
706         row = m_Mysql->fetch_row(result.get());
707
708         if (!row)
709                 return nullptr;
710
711         lengths = m_Mysql->fetch_lengths(result.get());
712
713         if (!lengths)
714                 return nullptr;
715
716         Dictionary::Ptr dict = new Dictionary();
717
718         m_Mysql->field_seek(result.get(), 0);
719         for (field = m_Mysql->fetch_field(result.get()), i = 0; field; field = m_Mysql->fetch_field(result.get()), i++)
720                 dict->Set(field->name, String(row[i], row[i] + lengths[i]));
721
722         return dict;
723 }
724
725 void IdoMysqlConnection::DiscardRows(const IdoMysqlResult& result)
726 {
727         Dictionary::Ptr row;
728
729         while ((row = FetchRow(result)))
730                 ; /* empty loop body */
731 }
732
733 void IdoMysqlConnection::ActivateObject(const DbObject::Ptr& dbobj)
734 {
735         if (IsPaused())
736                 return;
737
738 #ifdef I2_DEBUG /* I2_DEBUG */
739         Log(LogDebug, "IdoMysqlConnection")
740                 << "Scheduling object activation task for '" << dbobj->GetName1() << "!" << dbobj->GetName2() << "'.";
741 #endif /* I2_DEBUG */
742
743         m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalActivateObject, this, dbobj), PriorityNormal);
744 }
745
746 void IdoMysqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj)
747 {
748         AssertOnWorkQueue();
749
750         if (IsPaused())
751                 return;
752
753         if (!GetConnected())
754                 return;
755
756         DbReference dbref = GetObjectID(dbobj);
757         std::ostringstream qbuf;
758
759         if (!dbref.IsValid()) {
760                 if (!dbobj->GetName2().IsEmpty()) {
761                         qbuf << "INSERT INTO " + GetTablePrefix() + "objects (instance_id, objecttype_id, name1, name2, is_active) VALUES ("
762                                 << static_cast<long>(m_InstanceID) << ", " << dbobj->GetType()->GetTypeID() << ", "
763                                 << "'" << Escape(dbobj->GetName1()) << "', '" << Escape(dbobj->GetName2()) << "', 1)";
764                 } else {
765                         qbuf << "INSERT INTO " + GetTablePrefix() + "objects (instance_id, objecttype_id, name1, is_active) VALUES ("
766                                 << static_cast<long>(m_InstanceID) << ", " << dbobj->GetType()->GetTypeID() << ", "
767                                 << "'" << Escape(dbobj->GetName1()) << "', 1)";
768                 }
769
770                 Query(qbuf.str());
771                 SetObjectID(dbobj, GetLastInsertID());
772         } else {
773                 qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 1 WHERE object_id = " << static_cast<long>(dbref);
774                 AsyncQuery(qbuf.str());
775         }
776 }
777
778 void IdoMysqlConnection::DeactivateObject(const DbObject::Ptr& dbobj)
779 {
780         if (IsPaused())
781                 return;
782
783 #ifdef I2_DEBUG /* I2_DEBUG */
784         Log(LogDebug, "IdoMysqlConnection")
785                 << "Scheduling object deactivation task for '" << dbobj->GetName1() << "!" << dbobj->GetName2() << "'.";
786 #endif /* I2_DEBUG */
787
788         m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalDeactivateObject, this, dbobj), PriorityNormal);
789 }
790
791 void IdoMysqlConnection::InternalDeactivateObject(const DbObject::Ptr& dbobj)
792 {
793         AssertOnWorkQueue();
794
795         if (IsPaused())
796                 return;
797
798         if (!GetConnected())
799                 return;
800
801         DbReference dbref = GetObjectID(dbobj);
802
803         if (!dbref.IsValid())
804                 return;
805
806         std::ostringstream qbuf;
807         qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 0 WHERE object_id = " << static_cast<long>(dbref);
808         AsyncQuery(qbuf.str());
809
810         /* Note that we're _NOT_ clearing the db refs via SetReference/SetConfigUpdate/SetStatusUpdate
811          * because the object is still in the database. */
812 }
813
814 bool IdoMysqlConnection::FieldToEscapedString(const String& key, const Value& value, Value *result)
815 {
816         if (key == "instance_id") {
817                 *result = static_cast<long>(m_InstanceID);
818                 return true;
819         } else if (key == "session_token") {
820                 *result = GetSessionToken();
821                 return true;
822         }
823
824         Value rawvalue = DbValue::ExtractValue(value);
825
826         if (rawvalue.IsObjectType<ConfigObject>()) {
827                 DbObject::Ptr dbobjcol = DbObject::GetOrCreateByObject(rawvalue);
828
829                 if (!dbobjcol) {
830                         *result = 0;
831                         return true;
832                 }
833
834                 if (!IsIDCacheValid())
835                         return false;
836
837                 DbReference dbrefcol;
838
839                 if (DbValue::IsObjectInsertID(value)) {
840                         dbrefcol = GetInsertID(dbobjcol);
841
842                         if (!dbrefcol.IsValid())
843                                 return false;
844                 } else {
845                         dbrefcol = GetObjectID(dbobjcol);
846
847                         if (!dbrefcol.IsValid()) {
848                                 InternalActivateObject(dbobjcol);
849
850                                 dbrefcol = GetObjectID(dbobjcol);
851
852                                 if (!dbrefcol.IsValid())
853                                         return false;
854                         }
855                 }
856
857                 *result = static_cast<long>(dbrefcol);
858         } else if (DbValue::IsTimestamp(value)) {
859                 long ts = rawvalue;
860                 std::ostringstream msgbuf;
861                 msgbuf << "FROM_UNIXTIME(" << ts << ")";
862                 *result = Value(msgbuf.str());
863         } else if (DbValue::IsObjectInsertID(value)) {
864                 auto id = static_cast<long>(rawvalue);
865
866                 if (id <= 0)
867                         return false;
868
869                 *result = id;
870                 return true;
871         } else {
872                 Value fvalue;
873
874                 if (rawvalue.IsBoolean())
875                         fvalue = Convert::ToLong(rawvalue);
876                 else
877                         fvalue = rawvalue;
878
879                 *result = "'" + Escape(fvalue) + "'";
880         }
881
882         return true;
883 }
884
885 void IdoMysqlConnection::ExecuteQuery(const DbQuery& query)
886 {
887         if (IsPaused())
888                 return;
889
890         ASSERT(query.Category != DbCatInvalid);
891
892 #ifdef I2_DEBUG /* I2_DEBUG */
893         Log(LogDebug, "IdoMysqlConnection")
894                 << "Scheduling execute query task, type " << query.Type << ", table '" << query.Table << "'.";
895 #endif /* I2_DEBUG */
896
897         m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, -1), query.Priority, true);
898 }
899
900 void IdoMysqlConnection::ExecuteMultipleQueries(const std::vector<DbQuery>& queries)
901 {
902         if (IsPaused())
903                 return;
904
905         if (queries.empty())
906                 return;
907
908 #ifdef I2_DEBUG /* I2_DEBUG */
909         Log(LogDebug, "IdoMysqlConnection")
910                 << "Scheduling multiple execute query task, type " << queries[0].Type << ", table '" << queries[0].Table << "'.";
911 #endif /* I2_DEBUG */
912
913         m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalExecuteMultipleQueries, this, queries), queries[0].Priority, true);
914 }
915
916 bool IdoMysqlConnection::CanExecuteQuery(const DbQuery& query)
917 {
918         if (query.Object && !IsIDCacheValid())
919                 return false;
920
921         if (query.WhereCriteria) {
922                 ObjectLock olock(query.WhereCriteria);
923                 Value value;
924
925                 for (const Dictionary::Pair& kv : query.WhereCriteria) {
926                         if (!FieldToEscapedString(kv.first, kv.second, &value))
927                                 return false;
928                 }
929         }
930
931         if (query.Fields) {
932                 ObjectLock olock(query.Fields);
933
934                 for (const Dictionary::Pair& kv : query.Fields) {
935                         Value value;
936
937                         if (kv.second.IsEmpty() && !kv.second.IsString())
938                                 continue;
939
940                         if (!FieldToEscapedString(kv.first, kv.second, &value))
941                                 return false;
942                 }
943         }
944
945         return true;
946 }
947
948 void IdoMysqlConnection::InternalExecuteMultipleQueries(const std::vector<DbQuery>& queries)
949 {
950         AssertOnWorkQueue();
951
952         if (IsPaused())
953                 return;
954
955         if (!GetConnected())
956                 return;
957
958         for (const DbQuery& query : queries) {
959                 ASSERT(query.Type == DbQueryNewTransaction || query.Category != DbCatInvalid);
960
961                 if (!CanExecuteQuery(query)) {
962
963 #ifdef I2_DEBUG /* I2_DEBUG */
964                         Log(LogDebug, "IdoMysqlConnection")
965                                 << "Scheduling multiple execute query task again: Cannot execute query now. Type '"
966                                 << query.Type << "', table '" << query.Table << "', queue size: '" << GetPendingQueryCount() << "'.";
967 #endif /* I2_DEBUG */
968
969                         m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalExecuteMultipleQueries, this, queries), query.Priority);
970                         return;
971                 }
972         }
973
974         for (const DbQuery& query : queries) {
975                 InternalExecuteQuery(query);
976         }
977 }
978
979 void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query, int typeOverride)
980 {
981         AssertOnWorkQueue();
982
983         if (IsPaused())
984                 return;
985
986         if (!GetConnected())
987                 return;
988
989         if (query.Type == DbQueryNewTransaction) {
990                 InternalNewTransaction();
991                 return;
992         }
993
994         /* check whether we're allowed to execute the query first */
995         if (GetCategoryFilter() != DbCatEverything && (query.Category & GetCategoryFilter()) == 0)
996                 return;
997
998         if (query.Object && query.Object->GetObject()->GetExtension("agent_check").ToBool())
999                 return;
1000
1001         /* check if there are missing object/insert ids and re-enqueue the query */
1002         if (!CanExecuteQuery(query)) {
1003
1004 #ifdef I2_DEBUG /* I2_DEBUG */
1005                 Log(LogDebug, "IdoMysqlConnection")
1006                         << "Scheduling execute query task again: Cannot execute query now. Type '"
1007                         << typeOverride << "', table '" << query.Table << "', queue size: '" << GetPendingQueryCount() << "'.";
1008 #endif /* I2_DEBUG */
1009
1010                 m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, typeOverride), query.Priority);
1011                 return;
1012         }
1013
1014         std::ostringstream qbuf, where;
1015         int type;
1016
1017         if (query.WhereCriteria) {
1018                 where << " WHERE ";
1019
1020                 ObjectLock olock(query.WhereCriteria);
1021                 Value value;
1022                 bool first = true;
1023
1024                 for (const Dictionary::Pair& kv : query.WhereCriteria) {
1025                         if (!FieldToEscapedString(kv.first, kv.second, &value)) {
1026
1027 #ifdef I2_DEBUG /* I2_DEBUG */
1028                                 Log(LogDebug, "IdoMysqlConnection")
1029                                         << "Scheduling execute query task again: Cannot execute query now. Type '"
1030                                         << typeOverride << "', table '" << query.Table << "', queue size: '" << GetPendingQueryCount() << "'.";
1031 #endif /* I2_DEBUG */
1032
1033                                 m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, -1), query.Priority);
1034                                 return;
1035                         }
1036
1037                         if (!first)
1038                                 where << " AND ";
1039
1040                         where << kv.first << " = " << value;
1041
1042                         if (first)
1043                                 first = false;
1044                 }
1045         }
1046
1047         type = (typeOverride != -1) ? typeOverride : query.Type;
1048
1049         bool upsert = false;
1050
1051         if ((type & DbQueryInsert) && (type & DbQueryUpdate)) {
1052                 bool hasid = false;
1053
1054                 if (query.Object) {
1055                         if (query.ConfigUpdate)
1056                                 hasid = GetConfigUpdate(query.Object);
1057                         else if (query.StatusUpdate)
1058                                 hasid = GetStatusUpdate(query.Object);
1059                 }
1060
1061                 if (!hasid)
1062                         upsert = true;
1063
1064                 type = DbQueryUpdate;
1065         }
1066
1067         if ((type & DbQueryInsert) && (type & DbQueryDelete)) {
1068                 std::ostringstream qdel;
1069                 qdel << "DELETE FROM " << GetTablePrefix() << query.Table << where.str();
1070                 AsyncQuery(qdel.str());
1071
1072                 type = DbQueryInsert;
1073         }
1074
1075         switch (type) {
1076                 case DbQueryInsert:
1077                         qbuf << "INSERT INTO " << GetTablePrefix() << query.Table;
1078                         break;
1079                 case DbQueryUpdate:
1080                         qbuf << "UPDATE " << GetTablePrefix() << query.Table << " SET";
1081                         break;
1082                 case DbQueryDelete:
1083                         qbuf << "DELETE FROM " << GetTablePrefix() << query.Table;
1084                         break;
1085                 default:
1086                         VERIFY(!"Invalid query type.");
1087         }
1088
1089         if (type == DbQueryInsert || type == DbQueryUpdate) {
1090                 std::ostringstream colbuf, valbuf;
1091
1092                 if (type == DbQueryUpdate && query.Fields->GetLength() == 0)
1093                         return;
1094
1095                 ObjectLock olock(query.Fields);
1096
1097                 bool first = true;
1098                 for (const Dictionary::Pair& kv : query.Fields) {
1099                         Value value;
1100
1101                         if (kv.second.IsEmpty() && !kv.second.IsString())
1102                                 continue;
1103
1104                         if (!FieldToEscapedString(kv.first, kv.second, &value)) {
1105
1106 #ifdef I2_DEBUG /* I2_DEBUG */
1107                                 Log(LogDebug, "IdoMysqlConnection")
1108                                         << "Scheduling execute query task again: Cannot extract required INSERT/UPDATE fields, key '"
1109                                         << kv.first << "', val '" << kv.second << "', type " << typeOverride << ", table '" << query.Table << "'.";
1110 #endif /* I2_DEBUG */
1111
1112                                 m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, -1), query.Priority);
1113                                 return;
1114                         }
1115
1116                         if (type == DbQueryInsert) {
1117                                 if (!first) {
1118                                         colbuf << ", ";
1119                                         valbuf << ", ";
1120                                 }
1121
1122                                 colbuf << kv.first;
1123                                 valbuf << value;
1124                         } else {
1125                                 if (!first)
1126                                         qbuf << ", ";
1127
1128                                 qbuf << " " << kv.first << " = " << value;
1129                         }
1130
1131                         if (first)
1132                                 first = false;
1133                 }
1134
1135                 if (type == DbQueryInsert)
1136                         qbuf << " (" << colbuf.str() << ") VALUES (" << valbuf.str() << ")";
1137         }
1138
1139         if (type != DbQueryInsert)
1140                 qbuf << where.str();
1141
1142         AsyncQuery(qbuf.str(), std::bind(&IdoMysqlConnection::FinishExecuteQuery, this, query, type, upsert));
1143 }
1144
1145 void IdoMysqlConnection::FinishExecuteQuery(const DbQuery& query, int type, bool upsert)
1146 {
1147         if (upsert && GetAffectedRows() == 0) {
1148
1149 #ifdef I2_DEBUG /* I2_DEBUG */
1150                 Log(LogDebug, "IdoMysqlConnection")
1151                         << "Rescheduling DELETE/INSERT query: Upsert UPDATE did not affect rows, type " << type << ", table '" << query.Table << "'.";
1152 #endif /* I2_DEBUG */
1153
1154                 m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, DbQueryDelete | DbQueryInsert), query.Priority);
1155
1156                 return;
1157         }
1158
1159         if (type == DbQueryInsert && query.Object) {
1160                 if (query.ConfigUpdate) {
1161                         SetInsertID(query.Object, GetLastInsertID());
1162                         SetConfigUpdate(query.Object, true);
1163                 } else if (query.StatusUpdate)
1164                         SetStatusUpdate(query.Object, true);
1165         }
1166
1167         if (type == DbQueryInsert && query.Table == "notifications" && query.NotificationInsertID)
1168                 query.NotificationInsertID->SetValue(static_cast<long>(GetLastInsertID()));
1169 }
1170
1171 void IdoMysqlConnection::CleanUpExecuteQuery(const String& table, const String& time_column, double max_age)
1172 {
1173         if (IsPaused())
1174                 return;
1175
1176 #ifdef I2_DEBUG /* I2_DEBUG */
1177                 Log(LogDebug, "IdoMysqlConnection")
1178                         << "Rescheduling cleanup query for table '" << table << "' and column '"
1179                         << time_column << "'. max_age is set to '" << max_age << "'.";
1180 #endif /* I2_DEBUG */
1181
1182         m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age), PriorityLow, true);
1183 }
1184
1185 void IdoMysqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_column, double max_age)
1186 {
1187         AssertOnWorkQueue();
1188
1189         if (IsPaused())
1190                 return;
1191
1192         if (!GetConnected())
1193                 return;
1194
1195         AsyncQuery("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " +
1196                 Convert::ToString(static_cast<long>(m_InstanceID)) + " AND " + time_column +
1197                 " < FROM_UNIXTIME(" + Convert::ToString(static_cast<long>(max_age)) + ")");
1198 }
1199
1200 void IdoMysqlConnection::FillIDCache(const DbType::Ptr& type)
1201 {
1202         String query = "SELECT " + type->GetIDColumn() + " AS object_id, " + type->GetTable() + "_id, config_hash FROM " + GetTablePrefix() + type->GetTable() + "s";
1203         IdoMysqlResult result = Query(query);
1204
1205         Dictionary::Ptr row;
1206
1207         while ((row = FetchRow(result))) {
1208                 DbReference dbref(row->Get("object_id"));
1209                 SetInsertID(type, dbref, DbReference(row->Get(type->GetTable() + "_id")));
1210                 SetConfigHash(type, dbref, row->Get("config_hash"));
1211         }
1212 }
1213
1214 int IdoMysqlConnection::GetPendingQueryCount() const
1215 {
1216         return m_QueryQueue.GetLength();
1217 }