]> granicus.if.org Git - icinga2/blob - lib/db_ido_pgsql/idopgsqlconnection.cpp
Merge pull request #7065 from uubk/logrotate-fix
[icinga2] / lib / db_ido_pgsql / idopgsqlconnection.cpp
1 /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
2
3 #include "db_ido_pgsql/idopgsqlconnection.hpp"
4 #include "db_ido_pgsql/idopgsqlconnection-ti.cpp"
5 #include "db_ido/dbtype.hpp"
6 #include "db_ido/dbvalue.hpp"
7 #include "base/logger.hpp"
8 #include "base/objectlock.hpp"
9 #include "base/convert.hpp"
10 #include "base/utility.hpp"
11 #include "base/perfdatavalue.hpp"
12 #include "base/application.hpp"
13 #include "base/configtype.hpp"
14 #include "base/exception.hpp"
15 #include "base/context.hpp"
16 #include "base/statsfunction.hpp"
17 #include <utility>
18
19 using namespace icinga;
20
21 REGISTER_TYPE(IdoPgsqlConnection);
22
23 REGISTER_STATSFUNCTION(IdoPgsqlConnection, &IdoPgsqlConnection::StatsFunc);
24
25 IdoPgsqlConnection::IdoPgsqlConnection()
26 {
27         m_QueryQueue.SetName("IdoPgsqlConnection, " + GetName());
28 }
29
30 void IdoPgsqlConnection::OnConfigLoaded()
31 {
32         ObjectImpl<IdoPgsqlConnection>::OnConfigLoaded();
33
34         m_QueryQueue.SetName("IdoPgsqlConnection, " + GetName());
35
36         Library shimLibrary{"pgsql_shim"};
37
38         auto create_pgsql_shim = shimLibrary.GetSymbolAddress<create_pgsql_shim_ptr>("create_pgsql_shim");
39
40         m_Pgsql.reset(create_pgsql_shim());
41
42         std::swap(m_Library, shimLibrary);
43 }
44
45 void IdoPgsqlConnection::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
46 {
47         DictionaryData nodes;
48
49         for (const IdoPgsqlConnection::Ptr& idopgsqlconnection : ConfigType::GetObjectsByType<IdoPgsqlConnection>()) {
50                 size_t queryQueueItems = idopgsqlconnection->m_QueryQueue.GetLength();
51                 double queryQueueItemRate = idopgsqlconnection->m_QueryQueue.GetTaskCount(60) / 60.0;
52
53                 nodes.emplace_back(idopgsqlconnection->GetName(), new Dictionary({
54                         { "version", idopgsqlconnection->GetSchemaVersion() },
55                         { "instance_name", idopgsqlconnection->GetInstanceName() },
56                         { "connected", idopgsqlconnection->GetConnected() },
57                         { "query_queue_items", queryQueueItems },
58                         { "query_queue_item_rate", queryQueueItemRate }
59                 }));
60
61                 perfdata->Add(new PerfdataValue("idopgsqlconnection_" + idopgsqlconnection->GetName() + "_queries_rate", idopgsqlconnection->GetQueryCount(60) / 60.0));
62                 perfdata->Add(new PerfdataValue("idopgsqlconnection_" + idopgsqlconnection->GetName() + "_queries_1min", idopgsqlconnection->GetQueryCount(60)));
63                 perfdata->Add(new PerfdataValue("idopgsqlconnection_" + idopgsqlconnection->GetName() + "_queries_5mins", idopgsqlconnection->GetQueryCount(5 * 60)));
64                 perfdata->Add(new PerfdataValue("idopgsqlconnection_" + idopgsqlconnection->GetName() + "_queries_15mins", idopgsqlconnection->GetQueryCount(15 * 60)));
65                 perfdata->Add(new PerfdataValue("idopgsqlconnection_" + idopgsqlconnection->GetName() + "_query_queue_items", queryQueueItems));
66                 perfdata->Add(new PerfdataValue("idopgsqlconnection_" + idopgsqlconnection->GetName() + "_query_queue_item_rate", queryQueueItemRate));
67         }
68
69         status->Set("idopgsqlconnection", new Dictionary(std::move(nodes)));
70 }
71
72 void IdoPgsqlConnection::Resume()
73 {
74         Log(LogInformation, "IdoPgsqlConnection")
75                 << "'" << GetName() << "' resumed.";
76
77         SetConnected(false);
78
79         m_QueryQueue.SetExceptionCallback(std::bind(&IdoPgsqlConnection::ExceptionHandler, this, _1));
80
81         /* Immediately try to connect on Resume() without timer. */
82         m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::Reconnect, this), PriorityImmediate);
83
84         m_TxTimer = new Timer();
85         m_TxTimer->SetInterval(1);
86         m_TxTimer->OnTimerExpired.connect(std::bind(&IdoPgsqlConnection::TxTimerHandler, this));
87         m_TxTimer->Start();
88
89         m_ReconnectTimer = new Timer();
90         m_ReconnectTimer->SetInterval(10);
91         m_ReconnectTimer->OnTimerExpired.connect(std::bind(&IdoPgsqlConnection::ReconnectTimerHandler, this));
92         m_ReconnectTimer->Start();
93
94         /* Start with queries after connect. */
95         DbConnection::Resume();
96
97         ASSERT(m_Pgsql->isthreadsafe());
98 }
99
100 void IdoPgsqlConnection::Pause()
101 {
102         DbConnection::Pause();
103
104         m_ReconnectTimer.reset();
105
106         m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::Disconnect, this), PriorityLow);
107
108         /* Work on remaining tasks but never delete the threads, for HA resuming later. */
109         m_QueryQueue.Join();
110
111         Log(LogInformation, "IdoPgsqlConnection")
112                 << "'" << GetName() << "' paused.";
113 }
114
115 void IdoPgsqlConnection::ExceptionHandler(boost::exception_ptr exp)
116 {
117         Log(LogWarning, "IdoPgsqlConnection", "Exception during database operation: Verify that your database is operational!");
118
119         Log(LogDebug, "IdoPgsqlConnection")
120                 << "Exception during database operation: " << DiagnosticInformation(std::move(exp));
121
122         if (GetConnected()) {
123                 m_Pgsql->finish(m_Connection);
124                 SetConnected(false);
125         }
126 }
127
128 void IdoPgsqlConnection::AssertOnWorkQueue()
129 {
130         ASSERT(m_QueryQueue.IsWorkerThread());
131 }
132
133 void IdoPgsqlConnection::Disconnect()
134 {
135         AssertOnWorkQueue();
136
137         if (!GetConnected())
138                 return;
139
140         Query("COMMIT");
141
142         m_Pgsql->finish(m_Connection);
143         SetConnected(false);
144
145         Log(LogInformation, "IdoPgsqlConnection")
146                 << "Disconnected from '" << GetName() << "' database '" << GetDatabase() << "'.";
147 }
148
149 void IdoPgsqlConnection::TxTimerHandler()
150 {
151         NewTransaction();
152 }
153
154 void IdoPgsqlConnection::NewTransaction()
155 {
156         if (IsPaused())
157                 return;
158
159         m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::InternalNewTransaction, this), PriorityNormal, true);
160 }
161
162 void IdoPgsqlConnection::InternalNewTransaction()
163 {
164         AssertOnWorkQueue();
165
166         if (!GetConnected())
167                 return;
168
169         Query("COMMIT");
170         Query("BEGIN");
171 }
172
173 void IdoPgsqlConnection::ReconnectTimerHandler()
174 {
175         /* Only allow Reconnect events with high priority. */
176         m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::Reconnect, this), PriorityHigh);
177 }
178
179 void IdoPgsqlConnection::Reconnect()
180 {
181         AssertOnWorkQueue();
182
183         CONTEXT("Reconnecting to PostgreSQL IDO database '" + GetName() + "'");
184
185         double startTime = Utility::GetTime();
186
187         SetShouldConnect(true);
188
189         bool reconnect = false;
190
191         if (GetConnected()) {
192                 /* Check if we're really still connected */
193                 try {
194                         Query("SELECT 1");
195                         return;
196                 } catch (const std::exception&) {
197                         m_Pgsql->finish(m_Connection);
198                         SetConnected(false);
199                         reconnect = true;
200                 }
201         }
202
203         ClearIDCache();
204
205         String host = GetHost();
206         String port = GetPort();
207         String user = GetUser();
208         String password = GetPassword();
209         String database = GetDatabase();
210
211         String sslMode = GetSslMode();
212         String sslKey = GetSslKey();
213         String sslCert = GetSslCert();
214         String sslCa = GetSslCa();
215
216         String conninfo;
217
218         if (!host.IsEmpty())
219                 conninfo += " host=" + host;
220         if (!port.IsEmpty())
221                 conninfo += " port=" + port;
222         if (!user.IsEmpty())
223                 conninfo += " user=" + user;
224         if (!password.IsEmpty())
225                 conninfo += " password=" + password;
226         if (!database.IsEmpty())
227                 conninfo += " dbname=" + database;
228
229         if (!sslMode.IsEmpty())
230                 conninfo += " sslmode=" + sslMode;
231         if (!sslKey.IsEmpty())
232                 conninfo += " sslkey=" + sslKey;
233         if (!sslCert.IsEmpty())
234                 conninfo += " sslcert=" + sslCert;
235         if (!sslCa.IsEmpty())
236                 conninfo += " sslrootcert=" + sslCa;
237
238         /* connection */
239         m_Connection = m_Pgsql->connectdb(conninfo.CStr());
240
241         if (!m_Connection)
242                 return;
243
244         if (m_Pgsql->status(m_Connection) != CONNECTION_OK) {
245                 String message = m_Pgsql->errorMessage(m_Connection);
246                 m_Pgsql->finish(m_Connection);
247                 SetConnected(false);
248
249                 Log(LogCritical, "IdoPgsqlConnection")
250                         << "Connection to database '" << database << "' with user '" << user << "' on '" << host << ":" << port
251                         << "' failed: \"" << message << "\"";
252
253                 BOOST_THROW_EXCEPTION(std::runtime_error(message));
254         }
255
256         SetConnected(true);
257
258         IdoPgsqlResult result;
259
260         /* explicitely require legacy mode for string escaping in PostgreSQL >= 9.1
261          * changing standard_conforming_strings to on by default
262          */
263         if (m_Pgsql->serverVersion(m_Connection) >= 90100)
264                 result = Query("SET standard_conforming_strings TO off");
265
266         String dbVersionName = "idoutils";
267         result = Query("SELECT version FROM " + GetTablePrefix() + "dbversion WHERE name=E'" + Escape(dbVersionName) + "'");
268
269         Dictionary::Ptr row = FetchRow(result, 0);
270
271         if (!row) {
272                 m_Pgsql->finish(m_Connection);
273                 SetConnected(false);
274
275                 Log(LogCritical, "IdoPgsqlConnection", "Schema does not provide any valid version! Verify your schema installation.");
276
277                 BOOST_THROW_EXCEPTION(std::runtime_error("Invalid schema."));
278         }
279
280         String version = row->Get("version");
281
282         SetSchemaVersion(version);
283
284         if (Utility::CompareVersion(IDO_COMPAT_SCHEMA_VERSION, version) < 0) {
285                 m_Pgsql->finish(m_Connection);
286                 SetConnected(false);
287
288                 Log(LogCritical, "IdoPgsqlConnection")
289                         << "Schema version '" << version << "' does not match the required version '"
290                         << IDO_COMPAT_SCHEMA_VERSION << "' (or newer)! Please check the upgrade documentation at "
291                         << "https://docs.icinga.com/icinga2/latest/doc/module/icinga2/chapter/upgrading-icinga-2#upgrading-postgresql-db";
292
293                 BOOST_THROW_EXCEPTION(std::runtime_error("Schema version mismatch."));
294         }
295
296         String instanceName = GetInstanceName();
297
298         result = Query("SELECT instance_id FROM " + GetTablePrefix() + "instances WHERE instance_name = E'" + Escape(instanceName) + "'");
299         row = FetchRow(result, 0);
300
301         if (!row) {
302                 Query("INSERT INTO " + GetTablePrefix() + "instances (instance_name, instance_description) VALUES (E'" + Escape(instanceName) + "', E'" + Escape(GetInstanceDescription()) + "')");
303                 m_InstanceID = GetSequenceValue(GetTablePrefix() + "instances", "instance_id");
304         } else {
305                 m_InstanceID = DbReference(row->Get("instance_id"));
306         }
307
308         Endpoint::Ptr my_endpoint = Endpoint::GetLocalEndpoint();
309
310         /* we have an endpoint in a cluster setup, so decide if we can proceed here */
311         if (my_endpoint && GetHAMode() == HARunOnce) {
312                 /* get the current endpoint writing to programstatus table */
313                 result = Query("SELECT UNIX_TIMESTAMP(status_update_time) AS status_update_time, endpoint_name FROM " +
314                         GetTablePrefix() + "programstatus WHERE instance_id = " + Convert::ToString(m_InstanceID));
315                 row = FetchRow(result, 0);
316
317                 String endpoint_name;
318
319                 if (row)
320                         endpoint_name = row->Get("endpoint_name");
321                 else
322                         Log(LogNotice, "IdoPgsqlConnection", "Empty program status table");
323
324                 /* if we did not write into the database earlier, another instance is active */
325                 if (endpoint_name != my_endpoint->GetName()) {
326                         double status_update_time;
327
328                         if (row)
329                                 status_update_time = row->Get("status_update_time");
330                         else
331                                 status_update_time = 0;
332
333                         double now = Utility::GetTime();
334
335                         double status_update_age = now - status_update_time;
336                         double failoverTimeout = GetFailoverTimeout();
337
338                         if (status_update_age < GetFailoverTimeout()) {
339                                 Log(LogInformation, "IdoPgsqlConnection")
340                                         << "Last update by endpoint '" << endpoint_name << "' was "
341                                         << status_update_age << "s ago (< failover timeout of " << failoverTimeout << "s). Retrying.";
342
343                                 m_Pgsql->finish(m_Connection);
344                                 SetConnected(false);
345                                 SetShouldConnect(false);
346
347                                 return;
348                         }
349
350                         /* activate the IDO only, if we're authoritative in this zone */
351                         if (IsPaused()) {
352                                 Log(LogNotice, "IdoPgsqlConnection")
353                                         << "Local endpoint '" << my_endpoint->GetName() << "' is not authoritative, bailing out.";
354
355                                 m_Pgsql->finish(m_Connection);
356                                 SetConnected(false);
357
358                                 return;
359                         }
360
361                         SetLastFailover(now);
362
363                         Log(LogInformation, "IdoPgsqlConnection")
364                                 << "Last update by endpoint '" << endpoint_name << "' was "
365                                 << status_update_age << "s ago. Taking over '" << GetName() << "' in HA zone '" << Zone::GetLocalZone()->GetName() << "'.";
366                 }
367
368                 Log(LogNotice, "IdoPgsqlConnection", "Enabling IDO connection.");
369         }
370
371         Log(LogInformation, "IdoPgsqlConnection")
372                 << "PGSQL IDO instance id: " << static_cast<long>(m_InstanceID) << " (schema version: '" + version + "')"
373                 << (!sslMode.IsEmpty() ? ", sslmode='" + sslMode + "'" : "");
374
375         Query("BEGIN");
376
377         /* update programstatus table */
378         UpdateProgramStatus();
379
380         /* record connection */
381         Query("INSERT INTO " + GetTablePrefix() + "conninfo " +
382                 "(instance_id, connect_time, last_checkin_time, agent_name, agent_version, connect_type, data_start_time) VALUES ("
383                 + Convert::ToString(static_cast<long>(m_InstanceID)) + ", NOW(), NOW(), E'icinga2 db_ido_pgsql', E'" + Escape(Application::GetAppVersion())
384                 + "', E'" + (reconnect ? "RECONNECT" : "INITIAL") + "', NOW())");
385
386         /* clear config tables for the initial config dump */
387         PrepareDatabase();
388
389         std::ostringstream q1buf;
390         q1buf << "SELECT object_id, objecttype_id, name1, name2, is_active FROM " + GetTablePrefix() + "objects WHERE instance_id = " << static_cast<long>(m_InstanceID);
391         result = Query(q1buf.str());
392
393         std::vector<DbObject::Ptr> activeDbObjs;
394
395         int index = 0;
396         while ((row = FetchRow(result, index))) {
397                 index++;
398
399                 DbType::Ptr dbtype = DbType::GetByID(row->Get("objecttype_id"));
400
401                 if (!dbtype)
402                         continue;
403
404                 DbObject::Ptr dbobj = dbtype->GetOrCreateObjectByName(row->Get("name1"), row->Get("name2"));
405                 SetObjectID(dbobj, DbReference(row->Get("object_id")));
406                 bool active = row->Get("is_active");
407                 SetObjectActive(dbobj, active);
408
409                 if (active)
410                         activeDbObjs.push_back(dbobj);
411         }
412
413         SetIDCacheValid(true);
414
415         EnableActiveChangedHandler();
416
417         for (const DbObject::Ptr& dbobj : activeDbObjs) {
418                 if (dbobj->GetObject())
419                         continue;
420
421                 Log(LogNotice, "IdoPgsqlConnection")
422                         << "Deactivate deleted object name1: '" << dbobj->GetName1()
423                         << "' name2: '" << dbobj->GetName2() + "'.";
424                 DeactivateObject(dbobj);
425         }
426
427         UpdateAllObjects();
428
429         m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::ClearTablesBySession, this), PriorityNormal);
430
431         m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::FinishConnect, this, startTime), PriorityNormal);
432 }
433
434 void IdoPgsqlConnection::FinishConnect(double startTime)
435 {
436         AssertOnWorkQueue();
437
438         if (!GetConnected())
439                 return;
440
441         Log(LogInformation, "IdoPgsqlConnection")
442                 << "Finished reconnecting to '" << GetName() << "' database '" << GetDatabase() << "' in "
443                 << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
444
445         Query("COMMIT");
446         Query("BEGIN");
447 }
448
449 void IdoPgsqlConnection::ClearTablesBySession()
450 {
451         /* delete all comments and downtimes without current session token */
452         ClearTableBySession("comments");
453         ClearTableBySession("scheduleddowntime");
454 }
455
456 void IdoPgsqlConnection::ClearTableBySession(const String& table)
457 {
458         Query("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " +
459                 Convert::ToString(static_cast<long>(m_InstanceID)) + " AND session_token <> " +
460                 Convert::ToString(GetSessionToken()));
461 }
462
463 IdoPgsqlResult IdoPgsqlConnection::Query(const String& query)
464 {
465         AssertOnWorkQueue();
466
467         Log(LogDebug, "IdoPgsqlConnection")
468                 << "Query: " << query;
469
470         IncreaseQueryCount();
471
472         PGresult *result = m_Pgsql->exec(m_Connection, query.CStr());
473
474         if (!result) {
475                 String message = m_Pgsql->errorMessage(m_Connection);
476                 Log(LogCritical, "IdoPgsqlConnection")
477                         << "Error \"" << message << "\" when executing query \"" << query << "\"";
478
479                 BOOST_THROW_EXCEPTION(
480                         database_error()
481                         << errinfo_message(message)
482                         << errinfo_database_query(query)
483                 );
484         }
485
486         char *rowCount = m_Pgsql->cmdTuples(result);
487         m_AffectedRows = atoi(rowCount);
488
489         if (m_Pgsql->resultStatus(result) == PGRES_COMMAND_OK) {
490                 m_Pgsql->clear(result);
491                 return IdoPgsqlResult();
492         }
493
494         if (m_Pgsql->resultStatus(result) != PGRES_TUPLES_OK) {
495                 String message = m_Pgsql->resultErrorMessage(result);
496                 m_Pgsql->clear(result);
497
498                 Log(LogCritical, "IdoPgsqlConnection")
499                         << "Error \"" << message << "\" when executing query \"" << query << "\"";
500
501                 BOOST_THROW_EXCEPTION(
502                         database_error()
503                         << errinfo_message(message)
504                         << errinfo_database_query(query)
505                 );
506         }
507
508         return IdoPgsqlResult(result, std::bind(&PgsqlInterface::clear, std::cref(m_Pgsql), _1));
509 }
510
511 DbReference IdoPgsqlConnection::GetSequenceValue(const String& table, const String& column)
512 {
513         AssertOnWorkQueue();
514
515         IdoPgsqlResult result = Query("SELECT CURRVAL(pg_get_serial_sequence(E'" + Escape(table) + "', E'" + Escape(column) + "')) AS id");
516
517         Dictionary::Ptr row = FetchRow(result, 0);
518
519         ASSERT(row);
520
521         Log(LogDebug, "IdoPgsqlConnection")
522                 << "Sequence Value: " << row->Get("id");
523
524         return {Convert::ToLong(row->Get("id"))};
525 }
526
527 int IdoPgsqlConnection::GetAffectedRows()
528 {
529         AssertOnWorkQueue();
530
531         return m_AffectedRows;
532 }
533
534 String IdoPgsqlConnection::Escape(const String& s)
535 {
536         AssertOnWorkQueue();
537
538         String utf8s = Utility::ValidateUTF8(s);
539
540         size_t length = utf8s.GetLength();
541         auto *to = new char[utf8s.GetLength() * 2 + 1];
542
543         m_Pgsql->escapeStringConn(m_Connection, to, utf8s.CStr(), length, nullptr);
544
545         String result = String(to);
546
547         delete [] to;
548
549         return result;
550 }
551
552 Dictionary::Ptr IdoPgsqlConnection::FetchRow(const IdoPgsqlResult& result, int row)
553 {
554         AssertOnWorkQueue();
555
556         if (row >= m_Pgsql->ntuples(result.get()))
557                 return nullptr;
558
559         int columns = m_Pgsql->nfields(result.get());
560
561         DictionaryData dict;
562
563         for (int column = 0; column < columns; column++) {
564                 Value value;
565
566                 if (!m_Pgsql->getisnull(result.get(), row, column))
567                         value = m_Pgsql->getvalue(result.get(), row, column);
568
569                 dict.emplace_back(m_Pgsql->fname(result.get(), column), value);
570         }
571
572         return new Dictionary(std::move(dict));
573 }
574
575 void IdoPgsqlConnection::ActivateObject(const DbObject::Ptr& dbobj)
576 {
577         if (IsPaused())
578                 return;
579
580         m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::InternalActivateObject, this, dbobj), PriorityNormal);
581 }
582
583 void IdoPgsqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj)
584 {
585         AssertOnWorkQueue();
586
587         if (!GetConnected())
588                 return;
589
590         DbReference dbref = GetObjectID(dbobj);
591         std::ostringstream qbuf;
592
593         if (!dbref.IsValid()) {
594                 if (!dbobj->GetName2().IsEmpty()) {
595                         qbuf << "INSERT INTO " + GetTablePrefix() + "objects (instance_id, objecttype_id, name1, name2, is_active) VALUES ("
596                                 << static_cast<long>(m_InstanceID) << ", " << dbobj->GetType()->GetTypeID() << ", "
597                                 << "E'" << Escape(dbobj->GetName1()) << "', E'" << Escape(dbobj->GetName2()) << "', 1)";
598                 } else {
599                         qbuf << "INSERT INTO " + GetTablePrefix() + "objects (instance_id, objecttype_id, name1, is_active) VALUES ("
600                                 << static_cast<long>(m_InstanceID) << ", " << dbobj->GetType()->GetTypeID() << ", "
601                                 << "E'" << Escape(dbobj->GetName1()) << "', 1)";
602                 }
603
604                 Query(qbuf.str());
605                 SetObjectID(dbobj, GetSequenceValue(GetTablePrefix() + "objects", "object_id"));
606         } else {
607                 qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 1 WHERE object_id = " << static_cast<long>(dbref);
608                 Query(qbuf.str());
609         }
610 }
611
612 void IdoPgsqlConnection::DeactivateObject(const DbObject::Ptr& dbobj)
613 {
614         if (IsPaused())
615                 return;
616
617         m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::InternalDeactivateObject, this, dbobj), PriorityNormal);
618 }
619
620 void IdoPgsqlConnection::InternalDeactivateObject(const DbObject::Ptr& dbobj)
621 {
622         AssertOnWorkQueue();
623
624         if (!GetConnected())
625                 return;
626
627         DbReference dbref = GetObjectID(dbobj);
628
629         if (!dbref.IsValid())
630                 return;
631
632         std::ostringstream qbuf;
633         qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 0 WHERE object_id = " << static_cast<long>(dbref);
634         Query(qbuf.str());
635
636         /* Note that we're _NOT_ clearing the db refs via SetReference/SetConfigUpdate/SetStatusUpdate
637          * because the object is still in the database. */
638 }
639
640 bool IdoPgsqlConnection::FieldToEscapedString(const String& key, const Value& value, Value *result)
641 {
642         if (key == "instance_id") {
643                 *result = static_cast<long>(m_InstanceID);
644                 return true;
645         } else if (key == "session_token") {
646                 *result = GetSessionToken();
647                 return true;
648         }
649
650         Value rawvalue = DbValue::ExtractValue(value);
651
652         if (rawvalue.IsObjectType<ConfigObject>()) {
653                 DbObject::Ptr dbobjcol = DbObject::GetOrCreateByObject(rawvalue);
654
655                 if (!dbobjcol) {
656                         *result = 0;
657                         return true;
658                 }
659
660                 if (!IsIDCacheValid())
661                         return false;
662
663                 DbReference dbrefcol;
664
665                 if (DbValue::IsObjectInsertID(value)) {
666                         dbrefcol = GetInsertID(dbobjcol);
667
668                         if (!dbrefcol.IsValid())
669                                 return false;
670                 } else {
671                         dbrefcol = GetObjectID(dbobjcol);
672
673                         if (!dbrefcol.IsValid()) {
674                                 InternalActivateObject(dbobjcol);
675
676                                 dbrefcol = GetObjectID(dbobjcol);
677
678                                 if (!dbrefcol.IsValid())
679                                         return false;
680                         }
681                 }
682
683                 *result = static_cast<long>(dbrefcol);
684         } else if (DbValue::IsTimestamp(value)) {
685                 long ts = rawvalue;
686                 std::ostringstream msgbuf;
687                 msgbuf << "TO_TIMESTAMP(" << ts << ") AT TIME ZONE 'UTC'";
688                 *result = Value(msgbuf.str());
689         } else if (DbValue::IsObjectInsertID(value)) {
690                 auto id = static_cast<long>(rawvalue);
691
692                 if (id <= 0)
693                         return false;
694
695                 *result = id;
696                 return true;
697         } else {
698                 Value fvalue;
699
700                 if (rawvalue.IsBoolean())
701                         fvalue = Convert::ToLong(rawvalue);
702                 else
703                         fvalue = rawvalue;
704
705                 *result = "E'" + Escape(fvalue) + "'";
706         }
707
708         return true;
709 }
710
711 void IdoPgsqlConnection::ExecuteQuery(const DbQuery& query)
712 {
713         if (IsPaused())
714                 return;
715
716         ASSERT(query.Category != DbCatInvalid);
717
718         m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query, -1), query.Priority, true);
719 }
720
721 void IdoPgsqlConnection::ExecuteMultipleQueries(const std::vector<DbQuery>& queries)
722 {
723         if (IsPaused())
724                 return;
725
726         if (queries.empty())
727                 return;
728
729         m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::InternalExecuteMultipleQueries, this, queries), queries[0].Priority, true);
730 }
731
732 bool IdoPgsqlConnection::CanExecuteQuery(const DbQuery& query)
733 {
734         if (query.Object && !IsIDCacheValid())
735                 return false;
736
737         if (query.WhereCriteria) {
738                 ObjectLock olock(query.WhereCriteria);
739                 Value value;
740
741                 for (const Dictionary::Pair& kv : query.WhereCriteria) {
742                         if (!FieldToEscapedString(kv.first, kv.second, &value))
743                                 return false;
744                 }
745         }
746
747         if (query.Fields) {
748                 ObjectLock olock(query.Fields);
749
750                 for (const Dictionary::Pair& kv : query.Fields) {
751                         Value value;
752
753                         if (kv.second.IsEmpty() && !kv.second.IsString())
754                                 continue;
755
756                         if (!FieldToEscapedString(kv.first, kv.second, &value))
757                                 return false;
758                 }
759         }
760
761         return true;
762 }
763
764 void IdoPgsqlConnection::InternalExecuteMultipleQueries(const std::vector<DbQuery>& queries)
765 {
766         AssertOnWorkQueue();
767
768         if (IsPaused())
769                 return;
770
771         if (!GetConnected())
772                 return;
773
774         for (const DbQuery& query : queries) {
775                 ASSERT(query.Type == DbQueryNewTransaction || query.Category != DbCatInvalid);
776
777                 if (!CanExecuteQuery(query)) {
778                         m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::InternalExecuteMultipleQueries, this, queries), query.Priority);
779                         return;
780                 }
781         }
782
783         for (const DbQuery& query : queries) {
784                 InternalExecuteQuery(query);
785         }
786 }
787
788 void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query, int typeOverride)
789 {
790         AssertOnWorkQueue();
791
792         if (IsPaused())
793                 return;
794
795         if (!GetConnected())
796                 return;
797
798         if (query.Type == DbQueryNewTransaction) {
799                 InternalNewTransaction();
800                 return;
801         }
802
803         /* check whether we're allowed to execute the query first */
804         if (GetCategoryFilter() != DbCatEverything && (query.Category & GetCategoryFilter()) == 0)
805                 return;
806
807         if (query.Object && query.Object->GetObject()->GetExtension("agent_check").ToBool())
808                 return;
809
810         /* check if there are missing object/insert ids and re-enqueue the query */
811         if (!CanExecuteQuery(query)) {
812                 m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query, typeOverride), query.Priority);
813                 return;
814         }
815
816         std::ostringstream qbuf, where;
817         int type;
818
819         if (query.WhereCriteria) {
820                 where << " WHERE ";
821
822                 ObjectLock olock(query.WhereCriteria);
823                 Value value;
824                 bool first = true;
825
826                 for (const Dictionary::Pair& kv : query.WhereCriteria) {
827                         if (!FieldToEscapedString(kv.first, kv.second, &value)) {
828                                 m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query, -1), query.Priority);
829                                 return;
830                         }
831
832                         if (!first)
833                                 where << " AND ";
834
835                         where << kv.first << " = " << value;
836
837                         if (first)
838                                 first = false;
839                 }
840         }
841
842         type = (typeOverride != -1) ? typeOverride : query.Type;
843
844         bool upsert = false;
845
846         if ((type & DbQueryInsert) && (type & DbQueryUpdate)) {
847                 bool hasid = false;
848
849                 if (query.Object) {
850                         if (query.ConfigUpdate)
851                                 hasid = GetConfigUpdate(query.Object);
852                         else if (query.StatusUpdate)
853                                 hasid = GetStatusUpdate(query.Object);
854                 }
855
856                 if (!hasid)
857                         upsert = true;
858
859                 type = DbQueryUpdate;
860         }
861
862         if ((type & DbQueryInsert) && (type & DbQueryDelete)) {
863                 std::ostringstream qdel;
864                 qdel << "DELETE FROM " << GetTablePrefix() << query.Table << where.str();
865                 Query(qdel.str());
866
867                 type = DbQueryInsert;
868         }
869
870         switch (type) {
871                 case DbQueryInsert:
872                         qbuf << "INSERT INTO " << GetTablePrefix() << query.Table;
873                         break;
874                 case DbQueryUpdate:
875                         qbuf << "UPDATE " << GetTablePrefix() << query.Table << " SET";
876                         break;
877                 case DbQueryDelete:
878                         qbuf << "DELETE FROM " << GetTablePrefix() << query.Table;
879                         break;
880                 default:
881                         VERIFY(!"Invalid query type.");
882         }
883
884         if (type == DbQueryInsert || type == DbQueryUpdate) {
885                 std::ostringstream colbuf, valbuf;
886
887                 if (type == DbQueryUpdate && query.Fields->GetLength() == 0)
888                         return;
889
890                 ObjectLock olock(query.Fields);
891
892                 Value value;
893                 bool first = true;
894                 for (const Dictionary::Pair& kv : query.Fields) {
895                         if (kv.second.IsEmpty() && !kv.second.IsString())
896                                 continue;
897
898                         if (!FieldToEscapedString(kv.first, kv.second, &value)) {
899                                 m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query, -1), query.Priority);
900                                 return;
901                         }
902
903                         if (type == DbQueryInsert) {
904                                 if (!first) {
905                                         colbuf << ", ";
906                                         valbuf << ", ";
907                                 }
908
909                                 colbuf << kv.first;
910                                 valbuf << value;
911                         } else {
912                                 if (!first)
913                                         qbuf << ", ";
914
915                                 qbuf << " " << kv.first << " = " << value;
916                         }
917
918                         if (first)
919                                 first = false;
920                 }
921
922                 if (type == DbQueryInsert)
923                         qbuf << " (" << colbuf.str() << ") VALUES (" << valbuf.str() << ")";
924         }
925
926         if (type != DbQueryInsert)
927                 qbuf << where.str();
928
929         Query(qbuf.str());
930
931         if (upsert && GetAffectedRows() == 0) {
932                 InternalExecuteQuery(query, DbQueryDelete | DbQueryInsert);
933
934                 return;
935         }
936
937         if (type == DbQueryInsert && query.Object) {
938                 if (query.ConfigUpdate) {
939                         String idField = query.IdColumn;
940
941                         if (idField.IsEmpty())
942                                 idField = query.Table.SubStr(0, query.Table.GetLength() - 1) + "_id";
943
944                         SetInsertID(query.Object, GetSequenceValue(GetTablePrefix() + query.Table, idField));
945
946                         SetConfigUpdate(query.Object, true);
947                 } else if (query.StatusUpdate)
948                         SetStatusUpdate(query.Object, true);
949         }
950
951         if (type == DbQueryInsert && query.Table == "notifications" && query.NotificationInsertID) {
952                 DbReference seqval = GetSequenceValue(GetTablePrefix() + query.Table, "notification_id");
953                 query.NotificationInsertID->SetValue(static_cast<long>(seqval));
954         }
955 }
956
957 void IdoPgsqlConnection::CleanUpExecuteQuery(const String& table, const String& time_column, double max_age)
958 {
959         if (IsPaused())
960                 return;
961
962         m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age), PriorityLow, true);
963 }
964
965 void IdoPgsqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_column, double max_age)
966 {
967         AssertOnWorkQueue();
968
969         if (!GetConnected())
970                 return;
971
972         Query("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " +
973                 Convert::ToString(static_cast<long>(m_InstanceID)) + " AND " + time_column +
974                 " < TO_TIMESTAMP(" + Convert::ToString(static_cast<long>(max_age)) + ") AT TIME ZONE 'UTC'");
975 }
976
977 void IdoPgsqlConnection::FillIDCache(const DbType::Ptr& type)
978 {
979         String query = "SELECT " + type->GetIDColumn() + " AS object_id, " + type->GetTable() + "_id, config_hash FROM " + GetTablePrefix() + type->GetTable() + "s";
980         IdoPgsqlResult result = Query(query);
981
982         Dictionary::Ptr row;
983
984         int index = 0;
985         while ((row = FetchRow(result, index))) {
986                 index++;
987                 DbReference dbref(row->Get("object_id"));
988                 SetInsertID(type, dbref, DbReference(row->Get(type->GetTable() + "_id")));
989                 SetConfigHash(type, dbref, row->Get("config_hash"));
990         }
991 }
992
993 int IdoPgsqlConnection::GetPendingQueryCount() const
994 {
995         return m_QueryQueue.GetLength();
996 }