]> granicus.if.org Git - icinga2/commitdiff
Fix priority ordering for IDO queries
authorMichael Friedrich <michael.friedrich@netways.de>
Mon, 14 Dec 2015 10:36:03 +0000 (11:36 +0100)
committerGunnar Beutner <gunnar@beutner.name>
Tue, 23 Feb 2016 08:09:06 +0000 (09:09 +0100)
fixes #10829
refs #8714

12 files changed:
lib/base/workqueue.cpp
lib/base/workqueue.hpp
lib/db_ido/dbconnection.cpp
lib/db_ido/dbconnection.hpp
lib/db_ido/dbevents.cpp
lib/db_ido/dbevents.hpp
lib/db_ido/dbobject.cpp
lib/db_ido/dbobject.hpp
lib/db_ido_mysql/idomysqlconnection.cpp
lib/db_ido_mysql/idomysqlconnection.hpp
lib/db_ido_pgsql/idopgsqlconnection.cpp
lib/db_ido_pgsql/idopgsqlconnection.hpp

index 6a6d5fd712f1957b9abf11b287d0ea0eb956aca6..730c76bd0f631d03a31236f71be5dd7584db3baf 100644 (file)
@@ -81,7 +81,7 @@ void WorkQueue::Enqueue(const boost::function<void (void)>& function, WorkQueueP
                        m_CVFull.wait(lock);
        }
 
-       m_Tasks.push(Task(function, priority));
+       m_Tasks.push(Task(function, priority, ++m_NextTaskID));
 
        m_CVEmpty.notify_one();
 }
index 68be624926205bdbb3db8f7e30bf6d1aeb71ba88..320dfdc7bad64b2ec6d858e15e3d2cb78ef95296 100644 (file)
@@ -43,20 +43,31 @@ enum WorkQueuePriority
 struct Task
 {
        Task(void)
-           : Priority(PriorityNormal)
+           : Priority(PriorityNormal), ID(-1)
        { }
 
-       Task(const boost::function<void (void)>& function, WorkQueuePriority priority)
-           : Function(function), Priority(priority)
+       Task(const boost::function<void (void)>& function, WorkQueuePriority priority, int id)
+           : Function(function), Priority(priority), ID(id)
        { }
 
        boost::function<void (void)> Function;
        WorkQueuePriority Priority;
+       int ID;
 };
 
 inline bool operator<(const Task& a, const Task& b)
 {
-       return a.Priority < b.Priority;
+       if (a.Priority < b.Priority)
+               return true;
+
+       if (a.Priority == b.Priority) {
+               if (a.ID > b.ID)
+                       return true;
+               else
+                       return false;
+       }
+
+       return false;
 }
 
 /**
@@ -101,6 +112,7 @@ private:
        bool m_Stopped;
        int m_Processing;
        std::priority_queue<Task, std::deque<Task> > m_Tasks;
+       int m_NextTaskID;
        ExceptionCallback m_ExceptionCallback;
        std::vector<boost::exception_ptr> m_Exceptions;
        Timer::Ptr m_StatusTimer;
index 0ae036e4b55df2c692839bdd0a7de22ca72b1fe1..90d3ec7357eed2dab04c2d43ecc02158444547a5 100644 (file)
@@ -61,6 +61,7 @@ void DbConnection::Start(bool runtimeCreated)
        ObjectImpl<DbConnection>::Start(runtimeCreated);
 
        DbObject::OnQuery.connect(boost::bind(&DbConnection::ExecuteQuery, this, _1));
+       DbObject::OnMultipleQueries.connect(boost::bind(&DbConnection::ExecuteMultipleQueries, this, _1));
        ConfigObject::OnActiveChanged.connect(boost::bind(&DbConnection::UpdateObject, this, _1));
 }
 
@@ -131,6 +132,8 @@ void DbConnection::ProgramStatusHandler(void)
        Log(LogNotice, "DbConnection")
             << "Updating programstatus table.";
 
+       std::vector<DbQuery> queries;
+
        DbQuery query1;
        query1.Table = "programstatus";
        query1.Type = DbQueryDelete;
@@ -138,7 +141,7 @@ void DbConnection::ProgramStatusHandler(void)
        query1.WhereCriteria = new Dictionary();
        query1.WhereCriteria->Set("instance_id", 0);  /* DbConnection class fills in real ID */
        query1.Priority = PriorityHigh;
-       DbObject::OnQuery(query1);
+       queries.push_back(query1);
 
        DbQuery query2;
        query2.Table = "programstatus";
@@ -165,7 +168,9 @@ void DbConnection::ProgramStatusHandler(void)
        query2.Fields->Set("flap_detection_enabled", (IcingaApplication::GetInstance()->GetEnableFlapping() ? 1 : 0));
        query2.Fields->Set("process_performance_data", (IcingaApplication::GetInstance()->GetEnablePerfdata() ? 1 : 0));
        query2.Priority = PriorityHigh;
-       DbObject::OnQuery(query2);
+       queries.push_back(query2);
+
+       DbObject::OnMultipleQueries(queries);
 
        DbQuery query3;
        query3.Table = "runtimevariables";
@@ -351,11 +356,6 @@ bool DbConnection::GetStatusUpdate(const DbObject::Ptr& dbobj) const
        return (m_StatusUpdates.find(dbobj) != m_StatusUpdates.end());
 }
 
-void DbConnection::ExecuteQuery(const DbQuery&)
-{
-       /* Default handler does nothing. */
-}
-
 void DbConnection::UpdateObject(const ConfigObject::Ptr& object)
 {
        if (!GetConnected())
index f1b2d68a7eb9a464d4cf96845b250e2baf24a91c..e93c6b297acd64a9c3d26c1409072521fa84cd7a 100644 (file)
@@ -83,6 +83,7 @@ protected:
        virtual void Pause(void) override;
 
        virtual void ExecuteQuery(const DbQuery& query) = 0;
+       virtual void ExecuteMultipleQueries(const std::vector<DbQuery>&) = 0;
        virtual void ActivateObject(const DbObject::Ptr& dbobj) = 0;
        virtual void DeactivateObject(const DbObject::Ptr& dbobj) = 0;
 
index 49291222d2647709a42edd018ecbc9552b6d1b50..a706ed793bc125de51d765b492a279f0fa0882be 100644 (file)
@@ -45,7 +45,7 @@ void DbEvents::StaticInitialize(void)
        /* Status */
        Comment::OnCommentAdded.connect(boost::bind(&DbEvents::AddComment, _1));
        Comment::OnCommentRemoved.connect(boost::bind(&DbEvents::RemoveComment, _1));
-       Downtime::OnDowntimeAdded.connect(boost::bind(&DbEvents::AddDowntime, _1, true));
+       Downtime::OnDowntimeAdded.connect(boost::bind(&DbEvents::AddDowntime, _1));
        Downtime::OnDowntimeRemoved.connect(boost::bind(&DbEvents::RemoveDowntime, _1));
        Downtime::OnDowntimeTriggered.connect(boost::bind(&DbEvents::TriggerDowntime, _1));
        Checkable::OnAcknowledgementSet.connect(boost::bind(&DbEvents::AddAcknowledgement, _1, _4));
@@ -303,30 +303,42 @@ void DbEvents::AddComments(const Checkable::Ptr& checkable)
 {
        std::set<Comment::Ptr> comments = checkable->GetComments();
 
-       if (!comments.empty())
-               RemoveComments(checkable);
+       if (comments.empty())
+               return;
+
+       std::vector<DbQuery> queries;
+
+       DbQuery query1;
+       query1.Table = "comments";
+       query1.Type = DbQueryDelete;
+       query1.Category = DbCatComment;
+       query1.WhereCriteria = new Dictionary();
+       query1.WhereCriteria->Set("object_id", checkable);
+       queries.push_back(query1);
 
        BOOST_FOREACH(const Comment::Ptr& comment, comments) {
-               AddComment(comment);
+               AddCommentInternal(queries, comment, false);
        }
+
+       DbObject::OnMultipleQueries(queries);
 }
 
 void DbEvents::AddComment(const Comment::Ptr& comment)
 {
-       AddCommentInternal(comment, false);
+       std::vector<DbQuery> queries;
+       RemoveCommentInternal(queries, comment);
+       AddCommentInternal(queries, comment, false);
+       DbObject::OnMultipleQueries(queries);
 }
 
 void DbEvents::AddCommentHistory(const Comment::Ptr& comment)
 {
-       AddCommentInternal(comment, true);
+       std::vector<DbQuery> queries;
+       AddCommentInternal(queries, comment, true);
+       DbObject::OnMultipleQueries(queries);
 }
 
-void DbEvents::AddCommentInternal(const Comment::Ptr& comment, bool historical)
-{
-       AddCommentByType(comment, historical);
-}
-
-void DbEvents::AddCommentByType(const Comment::Ptr& comment, bool historical)
+void DbEvents::AddCommentInternal(std::vector<DbQuery>& queries, const Comment::Ptr& comment, bool historical)
 {
        Checkable::Ptr checkable = comment->GetCheckable();
 
@@ -376,24 +388,18 @@ void DbEvents::AddCommentByType(const Comment::Ptr& comment, bool historical)
        query1.Type = DbQueryInsert;
        query1.Category = DbCatComment;
        query1.Fields = fields1;
-       DbObject::OnQuery(query1);
+
+       queries.push_back(query1);
 }
 
-void DbEvents::RemoveComments(const Checkable::Ptr& checkable)
+void DbEvents::RemoveComment(const Comment::Ptr& comment)
 {
-       Log(LogDebug, "DbEvents")
-           << "removing service comments for '" << checkable->GetName() << "'";
-
-       DbQuery query1;
-       query1.Table = "comments";
-       query1.Type = DbQueryDelete;
-       query1.Category = DbCatComment;
-       query1.WhereCriteria = new Dictionary();
-       query1.WhereCriteria->Set("object_id", checkable);
-       DbObject::OnQuery(query1);
+       std::vector<DbQuery> queries;
+       RemoveCommentInternal(queries, comment);
+       DbObject::OnMultipleQueries(queries);
 }
 
-void DbEvents::RemoveComment(const Comment::Ptr& comment)
+void DbEvents::RemoveCommentInternal(std::vector<DbQuery>& queries, const Comment::Ptr& comment)
 {
        Checkable::Ptr checkable = comment->GetCheckable();
 
@@ -425,6 +431,7 @@ void DbEvents::RemoveComment(const Comment::Ptr& comment)
 
        query2.WhereCriteria = new Dictionary();
        query2.WhereCriteria->Set("internal_comment_id", comment->GetLegacyId());
+       query2.WhereCriteria->Set("object_id", checkable);
        query2.WhereCriteria->Set("comment_time", DbValue::FromTimestamp(entry_time));
        query2.WhereCriteria->Set("instance_id", 0); /* DbConnection class fills in real ID */
 
@@ -436,37 +443,42 @@ void DbEvents::AddDowntimes(const Checkable::Ptr& checkable)
 {
        std::set<Downtime::Ptr> downtimes = checkable->GetDowntimes();
 
-       if (!downtimes.empty())
-               RemoveDowntimes(checkable);
+       if (downtimes.empty())
+               return;
+
+       std::vector<DbQuery> queries;
+
+       DbQuery query1;
+       query1.Table = "scheduleddowntime";
+       query1.Type = DbQueryDelete;
+       query1.Category = DbCatDowntime;
+       query1.WhereCriteria = new Dictionary();
+       query1.WhereCriteria->Set("object_id", checkable);
+       queries.push_back(query1);
 
        BOOST_FOREACH(const Downtime::Ptr& downtime, downtimes) {
-               AddDowntime(downtime, false);
+               AddDowntimeInternal(queries, downtime, false);
        }
-}
 
-void DbEvents::AddDowntime(const Downtime::Ptr& downtime, bool remove_existing)
-{
-       /*
-        * make sure to delete any old downtime to avoid multiple inserts from
-        * configured ScheduledDowntime dumps and CreateNextDowntime() calls
-        */
-       if (remove_existing)
-               RemoveDowntime(downtime);
-
-       AddDowntimeInternal(downtime, false);
+       DbObject::OnMultipleQueries(queries);
 }
 
-void DbEvents::AddDowntimeHistory(const Downtime::Ptr& downtime)
+void DbEvents::AddDowntime(const Downtime::Ptr& downtime)
 {
-       AddDowntimeInternal(downtime, true);
+       std::vector<DbQuery> queries;
+       RemoveDowntimeInternal(queries, downtime);
+       AddDowntimeInternal(queries, downtime, false);
+       DbObject::OnMultipleQueries(queries);
 }
 
-void DbEvents::AddDowntimeInternal(const Downtime::Ptr& downtime, bool historical)
+void DbEvents::AddDowntimeHistory(const Downtime::Ptr& downtime)
 {
-       AddDowntimeByType(downtime, historical);
+       std::vector<DbQuery> queries;
+       AddDowntimeInternal(queries, downtime, false);
+       DbObject::OnMultipleQueries(queries);
 }
 
-void DbEvents::AddDowntimeByType(const Downtime::Ptr& downtime, bool historical)
+void DbEvents::AddDowntimeInternal(std::vector<DbQuery>& queries, const Downtime::Ptr& downtime, bool historical)
 {
        Checkable::Ptr checkable = downtime->GetCheckable();
 
@@ -514,21 +526,18 @@ void DbEvents::AddDowntimeByType(const Downtime::Ptr& downtime, bool historical)
        query1.Type = DbQueryInsert;
        query1.Category = DbCatDowntime;
        query1.Fields = fields1;
-       DbObject::OnQuery(query1);
+
+       queries.push_back(query1);
 }
 
-void DbEvents::RemoveDowntimes(const Checkable::Ptr& checkable)
+void DbEvents::RemoveDowntime(const Downtime::Ptr& downtime)
 {
-       DbQuery query1;
-       query1.Table = "scheduleddowntime";
-       query1.Type = DbQueryDelete;
-       query1.Category = DbCatDowntime;
-       query1.WhereCriteria = new Dictionary();
-       query1.WhereCriteria->Set("object_id", checkable);
-       DbObject::OnQuery(query1);
+       std::vector<DbQuery> queries;
+       RemoveDowntimeInternal(queries, downtime);
+       DbObject::OnMultipleQueries(queries);
 }
 
-void DbEvents::RemoveDowntime(const Downtime::Ptr& downtime)
+void DbEvents::RemoveDowntimeInternal(std::vector<DbQuery>& queries, const Downtime::Ptr& downtime)
 {
        Checkable::Ptr checkable = downtime->GetCheckable();
 
@@ -541,7 +550,7 @@ void DbEvents::RemoveDowntime(const Downtime::Ptr& downtime)
        query1.WhereCriteria->Set("object_id", checkable);
        query1.WhereCriteria->Set("internal_downtime_id", downtime->GetLegacyId());
        query1.WhereCriteria->Set("instance_id", 0); /* DbConnection class fills in real ID */
-       DbObject::OnQuery(query1);
+       queries.push_back(query1);
 
        /* History - update actual_end_time, was_cancelled for service (and host in case) */
        double now = Utility::GetTime();
@@ -564,7 +573,7 @@ void DbEvents::RemoveDowntime(const Downtime::Ptr& downtime)
        query3.WhereCriteria->Set("entry_time", DbValue::FromTimestamp(downtime->GetEntryTime()));
        query3.WhereCriteria->Set("instance_id", 0); /* DbConnection class fills in real ID */
 
-       DbObject::OnQuery(query3);
+       queries.push_back(query3);
 
        /* host/service status */
        Host::Ptr host;
@@ -577,7 +586,7 @@ void DbEvents::RemoveDowntime(const Downtime::Ptr& downtime)
        else
                query4.Table = "hoststatus";
 
-       query4.Type = DbQueryInsert | DbQueryUpdate;
+       query4.Type = DbQueryUpdate;
        query4.Category = DbCatState;
        query4.StatusUpdate = true;
        query4.Object = DbObject::GetOrCreateByObject(checkable);
@@ -595,7 +604,7 @@ void DbEvents::RemoveDowntime(const Downtime::Ptr& downtime)
 
        query4.WhereCriteria->Set("instance_id", 0); /* DbConnection class fills in real ID */
 
-       DbObject::OnQuery(query4);
+       queries.push_back(query4);
 }
 
 void DbEvents::TriggerDowntime(const Downtime::Ptr& downtime)
@@ -660,7 +669,7 @@ void DbEvents::TriggerDowntime(const Downtime::Ptr& downtime)
        else
                query4.Table = "hoststatus";
 
-       query4.Type = DbQueryInsert | DbQueryUpdate;
+       query4.Type = DbQueryUpdate;
        query4.Category = DbCatState;
        query4.StatusUpdate = true;
        query4.Object = DbObject::GetOrCreateByObject(checkable);
@@ -754,7 +763,7 @@ void DbEvents::AddAcknowledgementInternal(const Checkable::Ptr& checkable, Ackno
        else
                query1.Table = "hoststatus";
 
-       query1.Type = DbQueryInsert | DbQueryUpdate;
+       query1.Type = DbQueryUpdate;
        query1.Category = DbCatState;
        query1.StatusUpdate = true;
        query1.Object = DbObject::GetOrCreateByObject(checkable);
index 7efd46b07d1600ccf5e9b70acccf61dfc3b9f82b..89df2b4690e14f85e58792fa2680fc3ffa513dc4 100644 (file)
@@ -61,11 +61,8 @@ class DbEvents
 public:
        static void StaticInitialize(void);
 
-       static void AddCommentByType(const Comment::Ptr& comment, bool historical);
        static void AddComments(const Checkable::Ptr& checkable);
-       static void RemoveComments(const Checkable::Ptr& checkable);
 
-       static void AddDowntimeByType(const Downtime::Ptr& downtime, bool historical);
        static void AddDowntimes(const Checkable::Ptr& checkable);
        static void RemoveDowntimes(const Checkable::Ptr& checkable);
 
@@ -85,7 +82,7 @@ public:
        static void AddComment(const Comment::Ptr& comment);
        static void RemoveComment(const Comment::Ptr& comment);
 
-       static void AddDowntime(const Downtime::Ptr& downtime, bool remove_existing);
+       static void AddDowntime(const Downtime::Ptr& downtime);
        static void RemoveDowntime(const Downtime::Ptr& downtime);
        static void TriggerDowntime(const Downtime::Ptr& downtime);
 
@@ -130,8 +127,10 @@ public:
 private:
        DbEvents(void);
 
-       static void AddCommentInternal(const Comment::Ptr& comment, bool historical);
-       static void AddDowntimeInternal(const Downtime::Ptr& downtime, bool historical);
+       static void AddCommentInternal(std::vector<DbQuery>& queries, const Comment::Ptr& comment, bool historical);
+       static void RemoveCommentInternal(std::vector<DbQuery>& queries, const Comment::Ptr& comment);
+       static void AddDowntimeInternal(std::vector<DbQuery>& queries, const Downtime::Ptr& downtime, bool historical);
+       static void RemoveDowntimeInternal(std::vector<DbQuery>& queries, const Downtime::Ptr& downtime);
        static void EnableChangedHandlerInternal(const Checkable::Ptr& checkable, const String& fieldName, bool enabled);
 };
 
index 3d203a59cc694508bcd756ef27529b8beb8d16b7..003a53d8aba98f87c2fc62e46cdb7962233a3459 100644 (file)
@@ -37,6 +37,7 @@
 using namespace icinga;
 
 boost::signals2::signal<void (const DbQuery&)> DbObject::OnQuery;
+boost::signals2::signal<void (const std::vector<DbQuery>&)> DbObject::OnMultipleQueries;
 
 INITIALIZE_ONCE(&DbObject::StaticInitialize);
 
index 8000428b991baae197d8cdf72365a05cdbf7849c..486779421a256c7e337fe7525e0de27b9a935fee 100644 (file)
@@ -79,6 +79,7 @@ public:
        static DbObject::Ptr GetOrCreateByObject(const ConfigObject::Ptr& object);
 
        static boost::signals2::signal<void (const DbQuery&)> OnQuery;
+       static boost::signals2::signal<void (const std::vector<DbQuery>&)> OnMultipleQueries;
 
        void SendConfigUpdate(void);
        void SendStatusUpdate(void);
index b39a1b0987e629896dbf3464d3a0b80cf8f54808..697b7395e5d56298efac85a39a0bddf541c9eb9d 100644 (file)
@@ -748,9 +748,21 @@ void IdoMysqlConnection::ExecuteQuery(const DbQuery& query)
 {
        ASSERT(query.Category != DbCatInvalid);
 
+       boost::mutex::scoped_lock lock(m_QueryQueueMutex);
        m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), query.Priority, true);
 }
 
+void IdoMysqlConnection::ExecuteMultipleQueries(const std::vector<DbQuery>& queries)
+{
+       boost::mutex::scoped_lock lock(m_QueryQueueMutex);
+
+       BOOST_FOREACH(const DbQuery& query, queries) {
+               ASSERT(query.Category != DbCatInvalid);
+
+               m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), query.Priority, true);
+       }
+}
+
 void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType *typeOverride)
 {
        AssertOnWorkQueue();
@@ -839,8 +851,10 @@ void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType
                        if (kv.second.IsEmpty() && !kv.second.IsString())
                                continue;
 
-                       if (!FieldToEscapedString(kv.first, kv.second, &value))
+                       if (!FieldToEscapedString(kv.first, kv.second, &value)) {
+                               m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), query.Priority);
                                return;
+                       }
 
                        if (type == DbQueryInsert) {
                                if (!first) {
index 3226f0bdef82388fa3f3f92d13260f44fbb8336d..7f1f6dfbda76fe6c9b4b3c678f5e2d79d1be3444 100644 (file)
@@ -63,6 +63,7 @@ protected:
        virtual void ActivateObject(const DbObject::Ptr& dbobj) override;
        virtual void DeactivateObject(const DbObject::Ptr& dbobj) override;
        virtual void ExecuteQuery(const DbQuery& query) override;
+       virtual void ExecuteMultipleQueries(const std::vector<DbQuery>& queries);
        virtual void CleanUpExecuteQuery(const String& table, const String& time_key, double time_value) override;
        virtual void FillIDCache(const DbType::Ptr& type) override;
        virtual void NewTransaction(void) override;
@@ -72,6 +73,7 @@ private:
        int m_SessionToken;
 
        WorkQueue m_QueryQueue;
+       boost::mutex m_QueryQueueMutex;
 
        MYSQL m_Connection;
        int m_AffectedRows;
index 5e3cd6632c627d4ef27e656d3956a47fcaf68f84..a0a63007df894667e31c925786c71302aacf2a8f 100644 (file)
@@ -630,9 +630,21 @@ void IdoPgsqlConnection::ExecuteQuery(const DbQuery& query)
 {
        ASSERT(query.Category != DbCatInvalid);
 
+       boost::mutex::scoped_lock lock(m_QueryQueueMutex);
        m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), query.Priority, true);
 }
 
+void IdoPgsqlConnection::ExecuteMultipleQueries(const std::vector<DbQuery>& queries)
+{
+       boost::mutex::scoped_lock lock(m_QueryQueueMutex);
+
+       BOOST_FOREACH(const DbQuery& query, queries) {
+               ASSERT(query.Category != DbCatInvalid);
+
+               m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), query.Priority, true);
+       }
+}
+
 void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType *typeOverride)
 {
        AssertOnWorkQueue();
@@ -657,8 +669,10 @@ void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType
                bool first = true;
 
                BOOST_FOREACH(const Dictionary::Pair& kv, query.WhereCriteria) {
-                       if (!FieldToEscapedString(kv.first, kv.second, &value))
+                       if (!FieldToEscapedString(kv.first, kv.second, &value)) {
+                               m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), query.Priority);
                                return;
+                       }
 
                        if (!first)
                                where << " AND ";
@@ -718,8 +732,10 @@ void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType
                        if (kv.second.IsEmpty() && !kv.second.IsString())
                                continue;
 
-                       if (!FieldToEscapedString(kv.first, kv.second, &value))
+                       if (!FieldToEscapedString(kv.first, kv.second, &value)) {
+                               m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), query.Priority);
                                return;
+                       }
 
                        if (type == DbQueryInsert) {
                                if (!first) {
index c8db6911c03d423783dafd25b7236f0305991dd3..135a0b1114edcefc981e6387db8c8e6e4ee889ee 100644 (file)
@@ -55,6 +55,7 @@ protected:
        virtual void ActivateObject(const DbObject::Ptr& dbobj) override;
        virtual void DeactivateObject(const DbObject::Ptr& dbobj) override;
        virtual void ExecuteQuery(const DbQuery& query) override;
+       virtual void ExecuteMultipleQueries(const std::vector<DbQuery>& queries);
        virtual void CleanUpExecuteQuery(const String& table, const String& time_key, double time_value) override;
        virtual void FillIDCache(const DbType::Ptr& type) override;
        virtual void NewTransaction(void) override;
@@ -64,6 +65,7 @@ private:
        int m_SessionToken;
 
        WorkQueue m_QueryQueue;
+       boost::mutex m_QueryQueueMutex;
 
        PGconn *m_Connection;
        int m_AffectedRows;