]> granicus.if.org Git - icinga2/blob - lib/db_ido_pgsql/idopgsqlconnection.cpp
Move PerfdataValue() class into base library
[icinga2] / lib / db_ido_pgsql / idopgsqlconnection.cpp
1 /******************************************************************************
2  * Icinga 2                                                                   *
3  * Copyright (C) 2012-2017 Icinga Development Team (https://www.icinga.com/)  *
4  *                                                                            *
5  * This program is free software; you can redistribute it and/or              *
6  * modify it under the terms of the GNU General Public License                *
7  * as published by the Free Software Foundation; either version 2             *
8  * of the License, or (at your option) any later version.                     *
9  *                                                                            *
10  * This program is distributed in the hope that it will be useful,            *
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
13  * GNU General Public License for more details.                               *
14  *                                                                            *
15  * You should have received a copy of the GNU General Public License          *
16  * along with this program; if not, write to the Free Software Foundation     *
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
18  ******************************************************************************/
19
20 #include "db_ido_pgsql/idopgsqlconnection.hpp"
21 #include "db_ido_pgsql/idopgsqlconnection.tcpp"
22 #include "db_ido/dbtype.hpp"
23 #include "db_ido/dbvalue.hpp"
24 #include "base/logger.hpp"
25 #include "base/objectlock.hpp"
26 #include "base/convert.hpp"
27 #include "base/utility.hpp"
28 #include "base/perfdatavalue.hpp"
29 #include "base/application.hpp"
30 #include "base/configtype.hpp"
31 #include "base/exception.hpp"
32 #include "base/context.hpp"
33 #include "base/statsfunction.hpp"
34 #include <boost/tuple/tuple.hpp>
35
36 using namespace icinga;
37
38 REGISTER_TYPE(IdoPgsqlConnection);
39
40 REGISTER_STATSFUNCTION(IdoPgsqlConnection, &IdoPgsqlConnection::StatsFunc);
41
42 IdoPgsqlConnection::IdoPgsqlConnection(void)
43         : m_QueryQueue(1000000)
44 {
45         m_QueryQueue.SetName("IdoPgsqlConnection, " + GetName());
46 }
47
48 void IdoPgsqlConnection::OnConfigLoaded(void)
49 {
50         ObjectImpl<IdoPgsqlConnection>::OnConfigLoaded();
51
52         m_QueryQueue.SetName("IdoPgsqlConnection, " + GetName());
53 }
54
55 void IdoPgsqlConnection::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
56 {
57         Dictionary::Ptr nodes = new Dictionary();
58
59         for (const IdoPgsqlConnection::Ptr& idopgsqlconnection : ConfigType::GetObjectsByType<IdoPgsqlConnection>()) {
60                 size_t items = idopgsqlconnection->m_QueryQueue.GetLength();
61
62                 Dictionary::Ptr stats = new Dictionary();
63                 stats->Set("version", idopgsqlconnection->GetSchemaVersion());
64                 stats->Set("connected", idopgsqlconnection->GetConnected());
65                 stats->Set("instance_name", idopgsqlconnection->GetInstanceName());
66                 stats->Set("query_queue_items", items);
67
68                 nodes->Set(idopgsqlconnection->GetName(), stats);
69
70                 perfdata->Add(new PerfdataValue("idopgsqlconnection_" + idopgsqlconnection->GetName() + "_queries_rate", idopgsqlconnection->GetQueryCount(60) / 60.0));
71                 perfdata->Add(new PerfdataValue("idopgsqlconnection_" + idopgsqlconnection->GetName() + "_queries_1min", idopgsqlconnection->GetQueryCount(60)));
72                 perfdata->Add(new PerfdataValue("idopgsqlconnection_" + idopgsqlconnection->GetName() + "_queries_5mins", idopgsqlconnection->GetQueryCount(5 * 60)));
73                 perfdata->Add(new PerfdataValue("idopgsqlconnection_" + idopgsqlconnection->GetName() + "_queries_15mins", idopgsqlconnection->GetQueryCount(15 * 60)));
74                 perfdata->Add(new PerfdataValue("idopgsqlconnection_" + idopgsqlconnection->GetName() + "_query_queue_items", items));
75         }
76
77         status->Set("idopgsqlconnection", nodes);
78 }
79
80 void IdoPgsqlConnection::Resume(void)
81 {
82         DbConnection::Resume();
83
84         Log(LogInformation, "IdoPgsqlConnection")
85             << "'" << GetName() << "' resumed.";
86
87         SetConnected(false);
88
89         m_QueryQueue.SetExceptionCallback(boost::bind(&IdoPgsqlConnection::ExceptionHandler, this, _1));
90
91         m_TxTimer = new Timer();
92         m_TxTimer->SetInterval(1);
93         m_TxTimer->OnTimerExpired.connect(boost::bind(&IdoPgsqlConnection::TxTimerHandler, this));
94         m_TxTimer->Start();
95
96         m_ReconnectTimer = new Timer();
97         m_ReconnectTimer->SetInterval(10);
98         m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&IdoPgsqlConnection::ReconnectTimerHandler, this));
99         m_ReconnectTimer->Start();
100         m_ReconnectTimer->Reschedule(0);
101
102         ASSERT(PQisthreadsafe());
103 }
104
105 void IdoPgsqlConnection::Pause(void)
106 {
107         Log(LogInformation, "IdoPgsqlConnection")
108             << "'" << GetName() << "' paused.";
109
110         m_ReconnectTimer.reset();
111
112         DbConnection::Pause();
113
114         m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::Disconnect, this), PriorityHigh);
115         m_QueryQueue.Join();
116 }
117
118 void IdoPgsqlConnection::ExceptionHandler(boost::exception_ptr exp)
119 {
120         Log(LogWarning, "IdoPgsqlConnection", "Exception during database operation: Verify that your database is operational!");
121
122         Log(LogDebug, "IdoPgsqlConnection")
123             << "Exception during database operation: " << DiagnosticInformation(exp);
124
125         if (GetConnected()) {
126                 PQfinish(m_Connection);
127                 SetConnected(false);
128         }
129 }
130
131 void IdoPgsqlConnection::AssertOnWorkQueue(void)
132 {
133         ASSERT(m_QueryQueue.IsWorkerThread());
134 }
135
136 void IdoPgsqlConnection::Disconnect(void)
137 {
138         AssertOnWorkQueue();
139
140         if (!GetConnected())
141                 return;
142
143         Query("COMMIT");
144
145         PQfinish(m_Connection);
146         SetConnected(false);
147 }
148
149 void IdoPgsqlConnection::TxTimerHandler(void)
150 {
151         NewTransaction();
152 }
153
154 void IdoPgsqlConnection::NewTransaction(void)
155 {
156         m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalNewTransaction, this), PriorityHigh, true);
157 }
158
159 void IdoPgsqlConnection::InternalNewTransaction(void)
160 {
161         AssertOnWorkQueue();
162
163         if (!GetConnected())
164                 return;
165
166         Query("COMMIT");
167         Query("BEGIN");
168 }
169
170 void IdoPgsqlConnection::ReconnectTimerHandler(void)
171 {
172         m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::Reconnect, this), PriorityLow);
173 }
174
175 void IdoPgsqlConnection::Reconnect(void)
176 {
177         AssertOnWorkQueue();
178
179         CONTEXT("Reconnecting to PostgreSQL IDO database '" + GetName() + "'");
180
181         double startTime = Utility::GetTime();
182
183         SetShouldConnect(true);
184
185         bool reconnect = false;
186
187         if (GetConnected()) {
188                 /* Check if we're really still connected */
189                 try {
190                         Query("SELECT 1");
191                         return;
192                 } catch (const std::exception&) {
193                         PQfinish(m_Connection);
194                         SetConnected(false);
195                         reconnect = true;
196                 }
197         }
198
199         ClearIDCache();
200
201         String ihost, iport, iuser, ipasswd, idb;
202         const char *host, *port, *user , *passwd, *db;
203
204         ihost = GetHost();
205         iport = GetPort();
206         iuser = GetUser();
207         ipasswd = GetPassword();
208         idb = GetDatabase();
209
210         host = (!ihost.IsEmpty()) ? ihost.CStr() : NULL;
211         port = (!iport.IsEmpty()) ? iport.CStr() : NULL;
212         user = (!iuser.IsEmpty()) ? iuser.CStr() : NULL;
213         passwd = (!ipasswd.IsEmpty()) ? ipasswd.CStr() : NULL;
214         db = (!idb.IsEmpty()) ? idb.CStr() : NULL;
215
216         m_Connection = PQsetdbLogin(host, port, NULL, NULL, db, user, passwd);
217
218         if (!m_Connection)
219                 return;
220
221         if (PQstatus(m_Connection) != CONNECTION_OK) {
222                 String message = PQerrorMessage(m_Connection);
223                 PQfinish(m_Connection);
224                 SetConnected(false);
225
226                 Log(LogCritical, "IdoPgsqlConnection")
227                     << "Connection to database '" << db << "' with user '" << user << "' on '" << host << ":" << port
228                     << "' failed: \"" << message << "\"";
229
230                 BOOST_THROW_EXCEPTION(std::runtime_error(message));
231         }
232
233         SetConnected(true);
234
235         IdoPgsqlResult result;
236
237         /* explicitely require legacy mode for string escaping in PostgreSQL >= 9.1
238          * changing standard_conforming_strings to on by default
239          */
240         if (PQserverVersion(m_Connection) >= 90100)
241                 result = Query("SET standard_conforming_strings TO off");
242
243         String dbVersionName = "idoutils";
244         result = Query("SELECT version FROM " + GetTablePrefix() + "dbversion WHERE name=E'" + Escape(dbVersionName) + "'");
245
246         Dictionary::Ptr row = FetchRow(result, 0);
247
248         if (!row) {
249                 PQfinish(m_Connection);
250                 SetConnected(false);
251
252                 Log(LogCritical, "IdoPgsqlConnection", "Schema does not provide any valid version! Verify your schema installation.");
253
254                 BOOST_THROW_EXCEPTION(std::runtime_error("Invalid schema."));
255         }
256
257         String version = row->Get("version");
258
259         SetSchemaVersion(version);
260
261         if (Utility::CompareVersion(IDO_COMPAT_SCHEMA_VERSION, version) < 0) {
262                 PQfinish(m_Connection);
263                 SetConnected(false);
264
265                 Log(LogCritical, "IdoPgsqlConnection")
266                     << "Schema version '" << version << "' does not match the required version '"
267                     << IDO_COMPAT_SCHEMA_VERSION << "' (or newer)! Please check the upgrade documentation at "
268                     << "https://docs.icinga.com/icinga2/latest/doc/module/icinga2/chapter/upgrading-icinga-2#upgrading-postgresql-db";
269
270                 BOOST_THROW_EXCEPTION(std::runtime_error("Schema version mismatch."));
271         }
272
273         String instanceName = GetInstanceName();
274
275         result = Query("SELECT instance_id FROM " + GetTablePrefix() + "instances WHERE instance_name = E'" + Escape(instanceName) + "'");
276         row = FetchRow(result, 0);
277
278         if (!row) {
279                 Query("INSERT INTO " + GetTablePrefix() + "instances (instance_name, instance_description) VALUES (E'" + Escape(instanceName) + "', E'" + Escape(GetInstanceDescription()) + "')");
280                 m_InstanceID = GetSequenceValue(GetTablePrefix() + "instances", "instance_id");
281         } else {
282                 m_InstanceID = DbReference(row->Get("instance_id"));
283         }
284
285         Endpoint::Ptr my_endpoint = Endpoint::GetLocalEndpoint();
286
287         /* we have an endpoint in a cluster setup, so decide if we can proceed here */
288         if (my_endpoint && GetHAMode() == HARunOnce) {
289                 /* get the current endpoint writing to programstatus table */
290                 result = Query("SELECT UNIX_TIMESTAMP(status_update_time) AS status_update_time, endpoint_name FROM " +
291                     GetTablePrefix() + "programstatus WHERE instance_id = " + Convert::ToString(m_InstanceID));
292                 row = FetchRow(result, 0);
293
294                 String endpoint_name;
295
296                 if (row)
297                         endpoint_name = row->Get("endpoint_name");
298                 else
299                         Log(LogNotice, "IdoPgsqlConnection", "Empty program status table");
300
301                 /* if we did not write into the database earlier, another instance is active */
302                 if (endpoint_name != my_endpoint->GetName()) {
303                         double status_update_time;
304
305                         if (row)
306                                 status_update_time = row->Get("status_update_time");
307                         else
308                                 status_update_time = 0;
309
310                         double status_update_age = Utility::GetTime() - status_update_time;
311
312                         Log(LogNotice, "IdoPgsqlConnection")
313                             << "Last update by '" << endpoint_name << "' was " << status_update_age << "s ago.";
314
315                         if (status_update_age < GetFailoverTimeout()) {
316                                 PQfinish(m_Connection);
317                                 SetConnected(false);
318                                 SetShouldConnect(false);
319
320                                 return;
321                         }
322
323                         /* activate the IDO only, if we're authoritative in this zone */
324                         if (IsPaused()) {
325                                 Log(LogNotice, "IdoPgsqlConnection")
326                                     << "Local endpoint '" << my_endpoint->GetName() << "' is not authoritative, bailing out.";
327
328                                 PQfinish(m_Connection);
329                                 SetConnected(false);
330
331                                 return;
332                         }
333                 }
334
335                 Log(LogNotice, "IdoPgsqlConnection", "Enabling IDO connection.");
336         }
337
338         Log(LogInformation, "IdoPgsqlConnection")
339             << "pgSQL IDO instance id: " << static_cast<long>(m_InstanceID) << " (schema version: '" + version + "')";
340
341         Query("BEGIN");
342
343         /* update programstatus table */
344         UpdateProgramStatus();
345
346         /* record connection */
347         Query("INSERT INTO " + GetTablePrefix() + "conninfo " +
348             "(instance_id, connect_time, last_checkin_time, agent_name, agent_version, connect_type, data_start_time) VALUES ("
349             + Convert::ToString(static_cast<long>(m_InstanceID)) + ", NOW(), NOW(), E'icinga2 db_ido_pgsql', E'" + Escape(Application::GetAppVersion())
350             + "', E'" + (reconnect ? "RECONNECT" : "INITIAL") + "', NOW())");
351
352         /* clear config tables for the initial config dump */
353         PrepareDatabase();
354
355         std::ostringstream q1buf;
356         q1buf << "SELECT object_id, objecttype_id, name1, name2, is_active FROM " + GetTablePrefix() + "objects WHERE instance_id = " << static_cast<long>(m_InstanceID);
357         result = Query(q1buf.str());
358
359         std::vector<DbObject::Ptr> activeDbObjs;
360
361         int index = 0;
362         while ((row = FetchRow(result, index))) {
363                 index++;
364
365                 DbType::Ptr dbtype = DbType::GetByID(row->Get("objecttype_id"));
366
367                 if (!dbtype)
368                         continue;
369
370                 DbObject::Ptr dbobj = dbtype->GetOrCreateObjectByName(row->Get("name1"), row->Get("name2"));
371                 SetObjectID(dbobj, DbReference(row->Get("object_id")));
372                 bool active = row->Get("is_active");
373                 SetObjectActive(dbobj, active);
374
375                 if (active)
376                         activeDbObjs.push_back(dbobj);
377         }
378
379         SetIDCacheValid(true);
380
381         EnableActiveChangedHandler();
382
383         for (const DbObject::Ptr& dbobj : activeDbObjs) {
384                 if (dbobj->GetObject())
385                         continue;
386
387                 Log(LogNotice, "IdoPgsqlConnection")
388                     << "Deactivate deleted object name1: '" << dbobj->GetName1()
389                     << "' name2: '" << dbobj->GetName2() + "'.";
390                 DeactivateObject(dbobj);
391         }
392
393         UpdateAllObjects();
394
395         m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::ClearTablesBySession, this), PriorityLow);
396
397         m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::FinishConnect, this, startTime), PriorityLow);
398 }
399
400 void IdoPgsqlConnection::FinishConnect(double startTime)
401 {
402         AssertOnWorkQueue();
403
404         if (!GetConnected())
405                 return;
406
407         Log(LogInformation, "IdoPgsqlConnection")
408             << "Finished reconnecting to PostgreSQL IDO database in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
409
410         Query("COMMIT");
411         Query("BEGIN");
412 }
413
414 void IdoPgsqlConnection::ClearTablesBySession(void)
415 {
416         /* delete all comments and downtimes without current session token */
417         ClearTableBySession("comments");
418         ClearTableBySession("scheduleddowntime");
419 }
420
421 void IdoPgsqlConnection::ClearTableBySession(const String& table)
422 {
423         Query("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " +
424             Convert::ToString(static_cast<long>(m_InstanceID)) + " AND session_token <> " +
425             Convert::ToString(GetSessionToken()));
426 }
427
428 IdoPgsqlResult IdoPgsqlConnection::Query(const String& query)
429 {
430         AssertOnWorkQueue();
431
432         Log(LogDebug, "IdoPgsqlConnection")
433             << "Query: " << query;
434
435         IncreaseQueryCount();
436
437         PGresult *result = PQexec(m_Connection, query.CStr());
438
439         if (!result) {
440                 String message = PQerrorMessage(m_Connection);
441                 Log(LogCritical, "IdoPgsqlConnection")
442                     << "Error \"" << message << "\" when executing query \"" << query << "\"";
443
444                 BOOST_THROW_EXCEPTION(
445                     database_error()
446                         << errinfo_message(message)
447                         << errinfo_database_query(query)
448                 );
449         }
450
451         char *rowCount = PQcmdTuples(result);
452         m_AffectedRows = atoi(rowCount);
453
454         if (PQresultStatus(result) == PGRES_COMMAND_OK) {
455                 PQclear(result);
456                 return IdoPgsqlResult();
457         }
458
459         if (PQresultStatus(result) != PGRES_TUPLES_OK) {
460                 String message = PQresultErrorMessage(result);
461                 PQclear(result);
462
463                 Log(LogCritical, "IdoPgsqlConnection")
464                     << "Error \"" << message << "\" when executing query \"" << query << "\"";
465
466                 BOOST_THROW_EXCEPTION(
467                     database_error()
468                         << errinfo_message(message)
469                         << errinfo_database_query(query)
470                 );
471         }
472
473         return IdoPgsqlResult(result, std::ptr_fun(PQclear));
474 }
475
476 DbReference IdoPgsqlConnection::GetSequenceValue(const String& table, const String& column)
477 {
478         AssertOnWorkQueue();
479
480         IdoPgsqlResult result = Query("SELECT CURRVAL(pg_get_serial_sequence(E'" + Escape(table) + "', E'" + Escape(column) + "')) AS id");
481
482         Dictionary::Ptr row = FetchRow(result, 0);
483
484         ASSERT(row);
485
486         Log(LogDebug, "IdoPgsqlConnection")
487             << "Sequence Value: " << row->Get("id");
488
489         return DbReference(Convert::ToLong(row->Get("id")));
490 }
491
492 int IdoPgsqlConnection::GetAffectedRows(void)
493 {
494         AssertOnWorkQueue();
495
496         return m_AffectedRows;
497 }
498
499 String IdoPgsqlConnection::Escape(const String& s)
500 {
501         AssertOnWorkQueue();
502
503         String utf8s = Utility::ValidateUTF8(s);
504
505         size_t length = utf8s.GetLength();
506         char *to = new char[utf8s.GetLength() * 2 + 1];
507
508         PQescapeStringConn(m_Connection, to, utf8s.CStr(), length, NULL);
509
510         String result = String(to);
511
512         delete [] to;
513
514         return result;
515 }
516
517 Dictionary::Ptr IdoPgsqlConnection::FetchRow(const IdoPgsqlResult& result, int row)
518 {
519         AssertOnWorkQueue();
520
521         if (row >= PQntuples(result.get()))
522                 return Dictionary::Ptr();
523
524         int columns = PQnfields(result.get());
525
526         Dictionary::Ptr dict = new Dictionary();
527
528         for (int column = 0; column < columns; column++) {
529                 Value value;
530
531                 if (!PQgetisnull(result.get(), row, column))
532                         value = PQgetvalue(result.get(), row, column);
533
534                 dict->Set(PQfname(result.get(), column), value);
535         }
536
537         return dict;
538 }
539
540 void IdoPgsqlConnection::ActivateObject(const DbObject::Ptr& dbobj)
541 {
542         m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalActivateObject, this, dbobj), PriorityLow);
543 }
544
545 void IdoPgsqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj)
546 {
547         AssertOnWorkQueue();
548
549         if (!GetConnected())
550                 return;
551
552         DbReference dbref = GetObjectID(dbobj);
553         std::ostringstream qbuf;
554
555         if (!dbref.IsValid()) {
556                 if (!dbobj->GetName2().IsEmpty()) {
557                         qbuf << "INSERT INTO " + GetTablePrefix() + "objects (instance_id, objecttype_id, name1, name2, is_active) VALUES ("
558                               << static_cast<long>(m_InstanceID) << ", " << dbobj->GetType()->GetTypeID() << ", "
559                               << "E'" << Escape(dbobj->GetName1()) << "', E'" << Escape(dbobj->GetName2()) << "', 1)";
560                 } else {
561                         qbuf << "INSERT INTO " + GetTablePrefix() + "objects (instance_id, objecttype_id, name1, is_active) VALUES ("
562                              << static_cast<long>(m_InstanceID) << ", " << dbobj->GetType()->GetTypeID() << ", "
563                              << "E'" << Escape(dbobj->GetName1()) << "', 1)";
564                 }
565
566                 Query(qbuf.str());
567                 SetObjectID(dbobj, GetSequenceValue(GetTablePrefix() + "objects", "object_id"));
568         } else {
569                 qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 1 WHERE object_id = " << static_cast<long>(dbref);
570                 Query(qbuf.str());
571         }
572 }
573
574 void IdoPgsqlConnection::DeactivateObject(const DbObject::Ptr& dbobj)
575 {
576         m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalDeactivateObject, this, dbobj), PriorityLow);
577 }
578
579 void IdoPgsqlConnection::InternalDeactivateObject(const DbObject::Ptr& dbobj)
580 {
581         AssertOnWorkQueue();
582
583         if (!GetConnected())
584                 return;
585
586         DbReference dbref = GetObjectID(dbobj);
587
588         if (!dbref.IsValid())
589                 return;
590
591         std::ostringstream qbuf;
592         qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 0 WHERE object_id = " << static_cast<long>(dbref);
593         Query(qbuf.str());
594
595         /* Note that we're _NOT_ clearing the db refs via SetReference/SetConfigUpdate/SetStatusUpdate
596          * because the object is still in the database. */
597 }
598
599 bool IdoPgsqlConnection::FieldToEscapedString(const String& key, const Value& value, Value *result)
600 {
601         if (key == "instance_id") {
602                 *result = static_cast<long>(m_InstanceID);
603                 return true;
604         } else if (key == "session_token") {
605                 *result = GetSessionToken();
606                 return true;
607         }
608
609         Value rawvalue = DbValue::ExtractValue(value);
610
611         if (rawvalue.IsObjectType<ConfigObject>()) {
612                 DbObject::Ptr dbobjcol = DbObject::GetOrCreateByObject(rawvalue);
613
614                 if (!dbobjcol) {
615                         *result = 0;
616                         return true;
617                 }
618
619                 if (!IsIDCacheValid())
620                         return false;
621
622                 DbReference dbrefcol;
623
624                 if (DbValue::IsObjectInsertID(value)) {
625                         dbrefcol = GetInsertID(dbobjcol);
626
627                         if (!dbrefcol.IsValid())
628                                 return false;
629                 } else {
630                         dbrefcol = GetObjectID(dbobjcol);
631
632                         if (!dbrefcol.IsValid()) {
633                                 InternalActivateObject(dbobjcol);
634
635                                 dbrefcol = GetObjectID(dbobjcol);
636
637                                 if (!dbrefcol.IsValid())
638                                         return false;
639                         }
640                 }
641
642                 *result = static_cast<long>(dbrefcol);
643         } else if (DbValue::IsTimestamp(value)) {
644                 long ts = rawvalue;
645                 std::ostringstream msgbuf;
646                 msgbuf << "TO_TIMESTAMP(" << ts << ") AT TIME ZONE 'UTC'";
647                 *result = Value(msgbuf.str());
648         } else if (DbValue::IsTimestampNow(value)) {
649                 *result = "NOW()";
650         } else if (DbValue::IsObjectInsertID(value)) {
651                 long id = static_cast<long>(rawvalue);
652
653                 if (id <= 0)
654                         return false;
655
656                 *result = id;
657                 return true;
658         } else {
659                 Value fvalue;
660
661                 if (rawvalue.IsBoolean())
662                         fvalue = Convert::ToLong(rawvalue);
663                 else
664                         fvalue = rawvalue;
665
666                 *result = "E'" + Escape(fvalue) + "'";
667         }
668
669         return true;
670 }
671
672 void IdoPgsqlConnection::ExecuteQuery(const DbQuery& query)
673 {
674         ASSERT(query.Category != DbCatInvalid);
675
676         m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query, -1), query.Priority, true);
677 }
678
679 void IdoPgsqlConnection::ExecuteMultipleQueries(const std::vector<DbQuery>& queries)
680 {
681         if (queries.empty())
682                 return;
683
684         m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteMultipleQueries, this, queries), queries[0].Priority, true);
685 }
686
687 bool IdoPgsqlConnection::CanExecuteQuery(const DbQuery& query)
688 {
689         if (query.Object && !IsIDCacheValid())
690                 return false;
691
692         if (query.WhereCriteria) {
693                 ObjectLock olock(query.WhereCriteria);
694                 Value value;
695
696                 for (const Dictionary::Pair& kv : query.WhereCriteria) {
697                         if (!FieldToEscapedString(kv.first, kv.second, &value))
698                                 return false;
699                 }
700         }
701
702         if (query.Fields) {
703                 ObjectLock olock(query.Fields);
704
705                 for (const Dictionary::Pair& kv : query.Fields) {
706                         Value value;
707
708                         if (kv.second.IsEmpty() && !kv.second.IsString())
709                                 continue;
710
711                         if (!FieldToEscapedString(kv.first, kv.second, &value))
712                                 return false;
713                 }
714         }
715
716         return true;
717 }
718
719 void IdoPgsqlConnection::InternalExecuteMultipleQueries(const std::vector<DbQuery>& queries)
720 {
721         AssertOnWorkQueue();
722
723         if (!GetConnected())
724                 return;
725
726         for (const DbQuery& query : queries) {
727                 ASSERT(query.Type == DbQueryNewTransaction || query.Category != DbCatInvalid);
728
729                 if (!CanExecuteQuery(query)) {
730                         m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteMultipleQueries, this, queries), query.Priority);
731                         return;
732                 }
733         }
734
735         for (const DbQuery& query : queries) {
736                 InternalExecuteQuery(query);
737         }
738 }
739
740 void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query, int typeOverride)
741 {
742         AssertOnWorkQueue();
743
744         if (!GetConnected())
745                 return;
746
747         if (query.Type == DbQueryNewTransaction) {
748                 InternalNewTransaction();
749                 return;
750         }
751
752         /* check whether we're allowed to execute the query first */
753         if (GetCategoryFilter() != DbCatEverything && (query.Category & GetCategoryFilter()) == 0)
754                 return;
755
756         if (query.Object && query.Object->GetObject()->GetExtension("agent_check").ToBool())
757                 return;
758
759         /* check if there are missing object/insert ids and re-enqueue the query */
760         if (!CanExecuteQuery(query)) {
761                 m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query, typeOverride), query.Priority);
762                 return;
763         }
764
765         std::ostringstream qbuf, where;
766         int type;
767
768         if (query.WhereCriteria) {
769                 where << " WHERE ";
770
771                 ObjectLock olock(query.WhereCriteria);
772                 Value value;
773                 bool first = true;
774
775                 for (const Dictionary::Pair& kv : query.WhereCriteria) {
776                         if (!FieldToEscapedString(kv.first, kv.second, &value)) {
777                                 m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query, -1), query.Priority);
778                                 return;
779                         }
780
781                         if (!first)
782                                 where << " AND ";
783
784                         where << kv.first << " = " << value;
785
786                         if (first)
787                                 first = false;
788                 }
789         }
790
791         type = (typeOverride != -1) ? typeOverride : query.Type;
792
793         bool upsert = false;
794
795         if ((type & DbQueryInsert) && (type & DbQueryUpdate)) {
796                 bool hasid = false;
797
798                 if (query.Object) {
799                         if (query.ConfigUpdate)
800                                 hasid = GetConfigUpdate(query.Object);
801                         else if (query.StatusUpdate)
802                                 hasid = GetStatusUpdate(query.Object);
803                 }
804
805                 if (!hasid)
806                         upsert = true;
807
808                 type = DbQueryUpdate;
809         }
810
811         if ((type & DbQueryInsert) && (type & DbQueryDelete)) {
812                 std::ostringstream qdel;
813                 qdel << "DELETE FROM " << GetTablePrefix() << query.Table << where.str();
814                 Query(qdel.str());
815
816                 type = DbQueryInsert;
817         }
818
819         switch (type) {
820                 case DbQueryInsert:
821                         qbuf << "INSERT INTO " << GetTablePrefix() << query.Table;
822                         break;
823                 case DbQueryUpdate:
824                         qbuf << "UPDATE " << GetTablePrefix() << query.Table << " SET";
825                         break;
826                 case DbQueryDelete:
827                         qbuf << "DELETE FROM " << GetTablePrefix() << query.Table;
828                         break;
829                 default:
830                         VERIFY(!"Invalid query type.");
831         }
832
833         if (type == DbQueryInsert || type == DbQueryUpdate) {
834                 std::ostringstream colbuf, valbuf;
835
836                 if (type == DbQueryUpdate && query.Fields->GetLength() == 0)
837                         return;
838
839                 ObjectLock olock(query.Fields);
840
841                 Value value;
842                 bool first = true;
843                 for (const Dictionary::Pair& kv : query.Fields) {
844                         if (kv.second.IsEmpty() && !kv.second.IsString())
845                                 continue;
846
847                         if (!FieldToEscapedString(kv.first, kv.second, &value)) {
848                                 m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query, -1), query.Priority);
849                                 return;
850                         }
851
852                         if (type == DbQueryInsert) {
853                                 if (!first) {
854                                         colbuf << ", ";
855                                         valbuf << ", ";
856                                 }
857
858                                 colbuf << kv.first;
859                                 valbuf << value;
860                         } else {
861                                 if (!first)
862                                         qbuf << ", ";
863
864                                 qbuf << " " << kv.first << " = " << value;
865                         }
866
867                         if (first)
868                                 first = false;
869                 }
870
871                 if (type == DbQueryInsert)
872                         qbuf << " (" << colbuf.str() << ") VALUES (" << valbuf.str() << ")";
873         }
874
875         if (type != DbQueryInsert)
876                 qbuf << where.str();
877
878         Query(qbuf.str());
879
880         if (upsert && GetAffectedRows() == 0) {
881                 InternalExecuteQuery(query, DbQueryDelete | DbQueryInsert);
882
883                 return;
884         }
885
886         if (type == DbQueryInsert && query.Object) {
887                 if (query.ConfigUpdate) {
888                         String idField = query.IdColumn;
889
890                         if (idField.IsEmpty())
891                                 idField = query.Table.SubStr(0, query.Table.GetLength() - 1) + "_id";
892
893                         SetInsertID(query.Object, GetSequenceValue(GetTablePrefix() + query.Table, idField));
894
895                         SetConfigUpdate(query.Object, true);
896                 } else if (query.StatusUpdate)
897                         SetStatusUpdate(query.Object, true);
898         }
899
900         if (type == DbQueryInsert && query.Table == "notifications" && query.NotificationInsertID) {
901                 DbReference seqval = GetSequenceValue(GetTablePrefix() + query.Table, "notification_id");
902                 query.NotificationInsertID->SetValue(static_cast<long>(seqval));
903         }
904 }
905
906 void IdoPgsqlConnection::CleanUpExecuteQuery(const String& table, const String& time_column, double max_age)
907 {
908         m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age), PriorityLow, true);
909 }
910
911 void IdoPgsqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_column, double max_age)
912 {
913         AssertOnWorkQueue();
914
915         if (!GetConnected())
916                 return;
917
918         Query("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " +
919             Convert::ToString(static_cast<long>(m_InstanceID)) + " AND " + time_column +
920             " < TO_TIMESTAMP(" + Convert::ToString(static_cast<long>(max_age)) + ")");
921 }
922
923 void IdoPgsqlConnection::FillIDCache(const DbType::Ptr& type)
924 {
925         String query = "SELECT " + type->GetIDColumn() + " AS object_id, " + type->GetTable() + "_id, config_hash FROM " + GetTablePrefix() + type->GetTable() + "s";
926         IdoPgsqlResult result = Query(query);
927
928         Dictionary::Ptr row;
929
930         int index = 0;
931         while ((row = FetchRow(result, index))) {
932                 index++;
933                 DbReference dbref(row->Get("object_id"));
934                 SetInsertID(type, dbref, DbReference(row->Get(type->GetTable() + "_id")));
935                 SetConfigHash(type, dbref, row->Get("config_hash"));
936         }
937 }
938
939 int IdoPgsqlConnection::GetPendingQueryCount(void) const
940 {
941         return m_QueryQueue.GetLength();
942 }